CLOVER🍀

That was when it all began.

ElasticsearchのIngest Nodeを試す

これは、なにをしたくて書いたもの?

Filebeatを見ていて、パイプラインやモジュールを覚えようと思うと、Ingest Nodeを知らなければいけないなぁと思い。

ちょっと、Ingest Nodeを試してみることにしました。

環境

今回の環境は、こちら。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 18.04.3 LTS
Release:    18.04
Codename:   bionic


$ java --version
openjdk 11.0.4 2019-07-16
OpenJDK Runtime Environment (build 11.0.4+11-post-Ubuntu-1ubuntu218.04.3)
OpenJDK 64-Bit Server VM (build 11.0.4+11-post-Ubuntu-1ubuntu218.04.3, mixed mode, sharing)

Elasticsearchは、7.5.0を使います。

$ curl localhost:9200?pretty
{
  "name" : "ubuntu1804.localdomain",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "D2pCI3ACR4ywifhayB7sqw",
  "version" : {
    "number" : "7.5.0",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "e9ccaed468e2fac2275a3761849cbee64b39519f",
    "build_date" : "2019-11-26T01:06:52.518245Z",
    "build_snapshot" : false,
    "lucene_version" : "8.3.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

Ingest Nodeとは?

Ingest Nodeとは、Elasticsearchのインデックスとしてドキュメントを登録する前に、前処理を行う仕組みです。

Ingest node | Elasticsearch Reference [7.5] | Elastic

インデックスへの登録やバルクAPIに割り込んで、ドキュメントを変換してから登録を行います。

概要や、Logstashとの違いは、こちらも参考に。

Ingest Node: Voxxed Luxembourg - Speaker Deck

Should I use Logstash or Elasticsearch ingest nodes? | Elastic Blog

Ingest Nodeはデフォルトで有効になっていて、前処理を行うプロセッサーをパイプラインとして定義することで使えるようになります。
ひとつのパイプライン中で、プロセッサーは複数利用することができ、いろいろな処理を行うことができます。

パイプラインは、ドキュメントの登録時にパラメーターで指定するものですが、インデックステンプレートにデフォルトで適用される
パイプラインを指定することもできるみたいです。

Index modules / Dynamic index settings / index.default_pipeline

パイプラインと、登録するプロセッサーの書き方はこちら。

Pipeline Definition | Elasticsearch Reference [7.5] | Elastic

Processors | Elasticsearch Reference [7.5] | Elastic

条件分岐や、エラーハンドリングもできます。

Conditional Execution in Pipelines | Elasticsearch Reference [7.5] | Elastic

Handling Failures in Pipelines | Elasticsearch Reference [7.5] | Elastic

Ingest Nodeで使えるプラグインもあったり、

Ingest Plugins | Elasticsearch Plugins and Integrations [7.5] | Elastic

また、パイプラインの登録や参照、削除、動作確認はREST APIで行うことができます。

Ingest APIs | Elasticsearch Reference [7.5] | Elastic

ドキュメントなどはこのくらいにして、実際に動かしてみましょう。

初めてのIngest Node

初めてのIngest Nodeということで。

まずは、Set Processorを使って、ドキュメントにフィールドを追加してみましょう。

Set Processor | Elasticsearch Reference [7.5] | Elastic

「hello-pipeline」というパイプライン名で、フィールド「label」を追加するパイプラインを登録してみます。

$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/hello-pipeline -d '
{
    "description": "Hello! my first pipeline",
    "processors": [
        {
            "set": {
                "field": "label",
                "value": "field added by pipeline"
            }
        }
    ]
}
'

動作確認してみましょう。Simulate pipeline APIを使って、確認することができます。

こんな感じでURLとドキュメントを指定して、実行。今回は、ドキュメントは2つ指定してあります。

$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/hello-pipeline/_simulate?pretty -d '
{
  "docs": [
    {
      "_source": {
        "field1": "hello"
      }
    },
    {
      "_source": {
        "field1": "world"
      }
    }
  ]
}
'

結果は、こんな感じに。

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "field1" : "hello",
          "label" : "field added by pipeline"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T15:19:09.559123Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "field1" : "world",
          "label" : "field added by pipeline"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T15:19:09.559137Z"
        }
      }
    }
  ]
}

「label」というフィールドが、指定した値(「field added by pipeline」)で追加されましたね。

今度は、ドキュメントを登録してみます。この時に、「pipeline」パラメーターで適用したいパイプラインを指定します。

$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/index/_doc/1?pipeline=hello-pipeline -d '
{
    "field1": "world"
}
'

確認。

$ curl localhost:9200/index/_doc/1?pretty
{
  "_index" : "index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field1" : "world",
    "label" : "field added by pipeline"
  }
}

こちらも、フィールドが追加されていることが確認できました。

使わなくなったパイプラインを、削除。

$ curl -XDELETE localhost:9200/_ingest/pipeline/hello-pipeline

ところで、Simulate pipeline APIは、パイプラインを実際に登録しなくても、パイプライン定義と対象のドキュメントを与えると
動作させることができます。こんな感じで。

$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/_simulate?pretty -d '
{
    "pipeline":
    {
        "description": "Hello! my first pipeline",
        "processors": [
            {
                "set": {
                    "field": "label",
                    "value": "field added by pipeline"
                }
            }
        ]
    },
    "docs": [
        {
            "_source": {
                "field1": "hello"
            }
        },
        {
            "_source": {
                "field1": "world"
            }
        }
    ]
}
'

結果。

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "field1" : "hello",
          "label" : "field added by pipeline"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T15:28:03.302141Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "field1" : "world",
          "label" : "field added by pipeline"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T15:28:03.30215Z"
        }
      }
    }
  ]
}

覚えておきましょう。

もうちょっと長いパイプラインを書く

次は、もうちょっと長いパイプラインを書いてみましょう。

お題として、ApacheアクセスログをGrok Processorでパースしてあれこれしてみましょう。

Grok Processor | Elasticsearch Reference [7.5] | Elastic

次のようなアクセスログを対象にします。

127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] "GET /index.html HTTP/1.1" 200 11173 "-" "curl/7.58.0"

まあ、Apacheアクセスログのデフォルトフォーマットなのですが。

LogFormat "%h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"" combined

パイプラインは、こんな感じで書いてみました。

$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d '
{
    "description": "parsing log pipeline",
    "processors": [
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request_method} %{DATA:url} HTTP/%{NUMBER:http_version}|-)?\" %{NUMBER:http_response_status_code:long} (?:%{NUMBER:http_response_body_bytes:long}|-)( \"%{DATA:http_request_referrer}\")?( \"%{DATA:user_agent}\")?"
                ]
            }
        }
    ],
    "on_failure": [
        {
            "set": {
                "field": "error",
                "value": "access log parse failed"
            }
        }
    ]
}
'

Grokの書き方は、こんな感じでパターンとキャプチャするフィールド名、型を指定できます。

The syntax for reusing a grok pattern comes in three forms: %{SYNTAX:SEMANTIC}, %{SYNTAX}, %{SYNTAX:SEMANTIC:TYPE}.

確認。

$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d '
{
  "docs": [
    {
      "_source": {
        "message": "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\""
      }
    }
  ]
}
'

こんな感じで、パースできました。

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "http_request_referrer" : "-",
          "http_response_status_code" : 200,
          "user_name" : "-",
          "http_version" : "1.1",
          "http_request_method" : "GET",
          "message" : "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\"",
          "url" : "/index.html",
          "http_response_body_bytes" : 11173,
          "source_address" : "127.0.0.1",
          "user_agent" : "curl/7.58.0",
          "access_time" : "05/Dec/2019:15:33:19 +0000"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T15:41:51.702802Z"
        }
      }
    }
  ]
}

また、今回は「on_failure」要素を入れてあるので

    "on_failure": [
        {
            "set": {
                "field": "error",
                "value": "access log parse failed"
            }
        }
    ]

パースに失敗するようなデータを入れると

$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d '
{
  "docs": [
    {
      "_source": {
        "message": "bad pattern"
      }
    }
  ]
}
'

この定義では、Set Processorでフィールドが追加されます。

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : "bad pattern",
          "error" : "access log parse failed"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T15:46:58.738005Z"
        }
      }
    }
  ]
}

先ほどの「on_failure」の例は、パイプライン全体として失敗した場合の扱いですが、こんな感じでプロセッサー単位に付けることも
できます。

$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d '
{
    "description": "parsing log pipeline",
    "processors": [
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request_method} %{DATA:url} HTTP/%{NUMBER:http_version}|-)?\" %{NUMBER:http_response_status_code:long} (?:%{NUMBER:http_response_body_bytes:long}|-)( \"%{DATA:http_request_referrer}\")?( \"%{DATA:user_agent}\")?"
                ],
                "on_failure": [
                    {
                        "set": {
                            "field": "error",
                            "value": "access log parse failed"
                        }
                    }
                ]
            }
        }
    ]
}
'

結果は同じなので、割愛。

ですが、この「on_failure」パイプラインに書くのか、プロセッサー単位に書くのかで、その後にさらにプロセッサーが続くはずだった
場合には、挙動が変わります。

違いは、次の2ページを見るとよいでしょう。

https://speakerdeck.com/elastic/ingest-node-voxxed-luxembourg?slide=29

https://speakerdeck.com/elastic/ingest-node-voxxed-luxembourg?slide=31

他にも、処理対象がなかった時に無視するかどうかの、「ignore_missing」などもあったりします。

ところで、こんな感じにパース後のフィールド名に「.」を入れてパースすると

$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d '
{
    "description": "parsing log pipeline",
    "processors": [
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request.method} %{DATA:url} HTTP/%{NUMBER:http_request.version}|-)?\" %{NUMBER:http_response.status_code:long} (?:%{NUMBER:http_response.body_bytes:long}|-)( \"%{DATA:http_request.referrer}\")?( \"%{DATA:http_request.user_agent}\")?"
                ]
            }
        }
    ],
    "on_failure": [
        {
            "set": {
                "field": "error",
                "value": "access log parse failed"
            }
        }
    ]
}
'

ネストした構造になります。

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "http_response" : {
            "body_bytes" : 11173,
            "status_code" : 200
          },
          "user_name" : "-",
          "source_address" : "127.0.0.1",
          "http_request" : {
            "referrer" : "-",
            "method" : "GET",
            "version" : "1.1",
            "user_agent" : "curl/7.58.0"
          },
          "message" : "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\"",
          "url" : "/index.html",
          "access_time" : "05/Dec/2019:15:33:19 +0000"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T15:44:44.967901Z"
        }
      }
    }
  ]
}

さて、話を戻しまして。最初にパースした結果は、こんな感じでした。

      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "http_request_referrer" : "-",
          "http_response_status_code" : 200,
          "user_name" : "-",
          "http_version" : "1.1",
          "http_request_method" : "GET",
          "message" : "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\"",
          "url" : "/index.html",
          "http_response_body_bytes" : 11173,
          "source_address" : "127.0.0.1",
          "user_agent" : "curl/7.58.0",
          "access_time" : "05/Dec/2019:15:33:19 +0000"
        },

オリジナルの「message」というフィールドは、もう要らないかもしれませんね。Remove Processorで、削除してみましょう。

Remove Processor | Elasticsearch Reference [7.5] | Elastic

Remove Processorを追加して、パイプラインを再登録。

$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d '
{
    "description": "parsing log pipeline",
    "processors": [
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request_method} %{DATA:url} HTTP/%{NUMBER:http_version}|-)?\" %{NUMBER:http_response_status_code:long} (?:%{NUMBER:http_response_body_bytes:long}|-)( \"%{DATA:http_request_referrer}\")?( \"%{DATA:user_agent}\")?"
                ]
            }
        },
        {
            "remove": { "field": "message" }
        }
    ],
    "on_failure": [
        {
            "set": {
                "field": "error",
                "value": "access log parse failed"
            }
        }
    ]
}
'

結果。

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "http_request_referrer" : "-",
          "http_response_status_code" : 200,
          "user_name" : "-",
          "http_version" : "1.1",
          "http_request_method" : "GET",
          "url" : "/index.html",
          "http_response_body_bytes" : 11173,
          "source_address" : "127.0.0.1",
          "user_agent" : "curl/7.58.0",
          "access_time" : "05/Dec/2019:15:33:19 +0000"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T16:05:59.137718Z"
        }
      }
    }
  ]
}

「message」フィールドがなくなりました。

最後に、ちょっと乱暴ですが、ステータスコードが200だったら「result」に「OK」、400以上だったら「NG」とするSet Processor
ifと共に追加してみましょう。

$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d '
{
    "description": "parsing log pipeline",
    "processors": [
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request_method} %{DATA:url} HTTP/%{NUMBER:http_version}|-)?\" %{NUMBER:http_response_status_code:long} (?:%{NUMBER:http_response_body_bytes:long}|-)( \"%{DATA:http_request_referrer}\")?( \"%{DATA:user_agent}\")?"
                ]
            }
        },
        {
            "remove": { "field": "message" }
        },
        {
            "set": {
                "if": "ctx.http_response_status_code == 200",
                "field": "result",
                "value": "OK"
            }
        },
        {
            "set": {
                "if": "ctx.http_response_status_code >= 400",
                "field": "result",
                "value": "NG"
            }
        }
    ],
    "on_failure": [
        {
            "set": {
                "field": "error",
                "value": "access log parse failed"
            }
        }
    ]
}
'

確認。

$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d '
{
  "docs": [
    {
      "_source": {
        "message": "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\""
      }
    }
  ]
}
'

「OK」が入りました。

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "http_request_referrer" : "-",
          "http_response_status_code" : 200,
          "user_name" : "-",
          "http_version" : "1.1",
          "http_request_method" : "GET",
          "url" : "/index.html",
          "result" : "OK",
          "http_response_body_bytes" : 11173,
          "source_address" : "127.0.0.1",
          "user_agent" : "curl/7.58.0",
          "access_time" : "05/Dec/2019:15:33:19 +0000"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T16:08:07.124479Z"
        }
      }
    }
  ]
}

「400」や「503」を入れてみます。

$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d '
{
  "docs": [
    {
      "_source": {
        "message": "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 400 11173 \"-\" \"curl/7.58.0\""
      }
    }
  ]
}
'


$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d '
{
  "docs": [
    {
      "_source": {
        "message": "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 503 11173 \"-\" \"curl/7.58.0\""
      }
    }
  ]
}
'

それぞれの結果。

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "http_request_referrer" : "-",
          "http_response_status_code" : 400,
          "user_name" : "-",
          "http_version" : "1.1",
          "http_request_method" : "GET",
          "url" : "/index.html",
          "result" : "NG",
          "http_response_body_bytes" : 11173,
          "source_address" : "127.0.0.1",
          "user_agent" : "curl/7.58.0",
          "access_time" : "05/Dec/2019:15:33:19 +0000"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T16:08:57.00472Z"
        }
      }
    }
  ]
}


{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "http_request_referrer" : "-",
          "http_response_status_code" : 503,
          "user_name" : "-",
          "http_version" : "1.1",
          "http_request_method" : "GET",
          "url" : "/index.html",
          "result" : "NG",
          "http_response_body_bytes" : 11173,
          "source_address" : "127.0.0.1",
          "user_agent" : "curl/7.58.0",
          "access_time" : "05/Dec/2019:15:33:19 +0000"
        },
        "_ingest" : {
          "timestamp" : "2019-12-06T16:09:20.963558Z"
        }
      }
    }
  ]
}

OKそうですね。

まとめ

ElasticsearchのIngest Nodeを、軽く試してみました。

最初はJSONで書かれたプロセッサーに面食らいましたが、ちゃんと見るとなんとか書けていけそうです。

エラーハンドリングなども合わせて、ちょっとずつ覚えていきましょう。