これは、なにをしたくて書いたもの?
Filebeatを見ていて、パイプラインやモジュールを覚えようと思うと、Ingest Nodeを知らなければいけないなぁと思い。
ちょっと、Ingest Nodeを試してみることにしました。
環境
今回の環境は、こちら。
$ lsb_release -a No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 18.04.3 LTS Release: 18.04 Codename: bionic $ java --version openjdk 11.0.4 2019-07-16 OpenJDK Runtime Environment (build 11.0.4+11-post-Ubuntu-1ubuntu218.04.3) OpenJDK 64-Bit Server VM (build 11.0.4+11-post-Ubuntu-1ubuntu218.04.3, mixed mode, sharing)
Elasticsearchは、7.5.0を使います。
$ curl localhost:9200?pretty { "name" : "ubuntu1804.localdomain", "cluster_name" : "elasticsearch", "cluster_uuid" : "D2pCI3ACR4ywifhayB7sqw", "version" : { "number" : "7.5.0", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "e9ccaed468e2fac2275a3761849cbee64b39519f", "build_date" : "2019-11-26T01:06:52.518245Z", "build_snapshot" : false, "lucene_version" : "8.3.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }
Ingest Nodeとは?
Ingest Nodeとは、Elasticsearchのインデックスとしてドキュメントを登録する前に、前処理を行う仕組みです。
Ingest node | Elasticsearch Reference [7.5] | Elastic
インデックスへの登録やバルクAPIに割り込んで、ドキュメントを変換してから登録を行います。
概要や、Logstashとの違いは、こちらも参考に。
Ingest Node: Voxxed Luxembourg - Speaker Deck
Should I use Logstash or Elasticsearch ingest nodes? | Elastic Blog
Ingest Nodeはデフォルトで有効になっていて、前処理を行うプロセッサーをパイプラインとして定義することで使えるようになります。
ひとつのパイプライン中で、プロセッサーは複数利用することができ、いろいろな処理を行うことができます。
パイプラインは、ドキュメントの登録時にパラメーターで指定するものですが、インデックステンプレートにデフォルトで適用される
パイプラインを指定することもできるみたいです。
Index modules / Dynamic index settings / index.default_pipeline
パイプラインと、登録するプロセッサーの書き方はこちら。
Pipeline Definition | Elasticsearch Reference [7.5] | Elastic
Processors | Elasticsearch Reference [7.5] | Elastic
条件分岐や、エラーハンドリングもできます。
Conditional Execution in Pipelines | Elasticsearch Reference [7.5] | Elastic
Handling Failures in Pipelines | Elasticsearch Reference [7.5] | Elastic
Ingest Nodeで使えるプラグインもあったり、
Ingest Plugins | Elasticsearch Plugins and Integrations [7.5] | Elastic
また、パイプラインの登録や参照、削除、動作確認はREST APIで行うことができます。
Ingest APIs | Elasticsearch Reference [7.5] | Elastic
ドキュメントなどはこのくらいにして、実際に動かしてみましょう。
初めてのIngest Node
初めてのIngest Nodeということで。
まずは、Set Processorを使って、ドキュメントにフィールドを追加してみましょう。
Set Processor | Elasticsearch Reference [7.5] | Elastic
「hello-pipeline」というパイプライン名で、フィールド「label」を追加するパイプラインを登録してみます。
$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/hello-pipeline -d ' { "description": "Hello! my first pipeline", "processors": [ { "set": { "field": "label", "value": "field added by pipeline" } } ] } '
動作確認してみましょう。Simulate pipeline APIを使って、確認することができます。
こんな感じでURLとドキュメントを指定して、実行。今回は、ドキュメントは2つ指定してあります。
$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/hello-pipeline/_simulate?pretty -d ' { "docs": [ { "_source": { "field1": "hello" } }, { "_source": { "field1": "world" } } ] } '
結果は、こんな感じに。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "field1" : "hello", "label" : "field added by pipeline" }, "_ingest" : { "timestamp" : "2019-12-06T15:19:09.559123Z" } } }, { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "field1" : "world", "label" : "field added by pipeline" }, "_ingest" : { "timestamp" : "2019-12-06T15:19:09.559137Z" } } } ] }
「label」というフィールドが、指定した値(「field added by pipeline」)で追加されましたね。
今度は、ドキュメントを登録してみます。この時に、「pipeline」パラメーターで適用したいパイプラインを指定します。
$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/index/_doc/1?pipeline=hello-pipeline -d ' { "field1": "world" } '
確認。
$ curl localhost:9200/index/_doc/1?pretty { "_index" : "index", "_type" : "_doc", "_id" : "1", "_version" : 1, "_seq_no" : 0, "_primary_term" : 1, "found" : true, "_source" : { "field1" : "world", "label" : "field added by pipeline" } }
こちらも、フィールドが追加されていることが確認できました。
使わなくなったパイプラインを、削除。
$ curl -XDELETE localhost:9200/_ingest/pipeline/hello-pipeline
ところで、Simulate pipeline APIは、パイプラインを実際に登録しなくても、パイプライン定義と対象のドキュメントを与えると
動作させることができます。こんな感じで。
$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/_simulate?pretty -d ' { "pipeline": { "description": "Hello! my first pipeline", "processors": [ { "set": { "field": "label", "value": "field added by pipeline" } } ] }, "docs": [ { "_source": { "field1": "hello" } }, { "_source": { "field1": "world" } } ] } '
結果。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "field1" : "hello", "label" : "field added by pipeline" }, "_ingest" : { "timestamp" : "2019-12-06T15:28:03.302141Z" } } }, { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "field1" : "world", "label" : "field added by pipeline" }, "_ingest" : { "timestamp" : "2019-12-06T15:28:03.30215Z" } } } ] }
覚えておきましょう。
もうちょっと長いパイプラインを書く
次は、もうちょっと長いパイプラインを書いてみましょう。
お題として、ApacheのアクセスログをGrok Processorでパースしてあれこれしてみましょう。
Grok Processor | Elasticsearch Reference [7.5] | Elastic
次のようなアクセスログを対象にします。
127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] "GET /index.html HTTP/1.1" 200 11173 "-" "curl/7.58.0"
まあ、Apacheのアクセスログのデフォルトフォーマットなのですが。
LogFormat "%h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"" combined
パイプラインは、こんな感じで書いてみました。
$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d ' { "description": "parsing log pipeline", "processors": [ { "grok": { "field": "message", "patterns": [ "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request_method} %{DATA:url} HTTP/%{NUMBER:http_version}|-)?\" %{NUMBER:http_response_status_code:long} (?:%{NUMBER:http_response_body_bytes:long}|-)( \"%{DATA:http_request_referrer}\")?( \"%{DATA:user_agent}\")?" ] } } ], "on_failure": [ { "set": { "field": "error", "value": "access log parse failed" } } ] } '
Grokの書き方は、こんな感じでパターンとキャプチャするフィールド名、型を指定できます。
The syntax for reusing a grok pattern comes in three forms: %{SYNTAX:SEMANTIC}, %{SYNTAX}, %{SYNTAX:SEMANTIC:TYPE}.
確認。
$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d ' { "docs": [ { "_source": { "message": "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\"" } } ] } '
こんな感じで、パースできました。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "http_request_referrer" : "-", "http_response_status_code" : 200, "user_name" : "-", "http_version" : "1.1", "http_request_method" : "GET", "message" : "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\"", "url" : "/index.html", "http_response_body_bytes" : 11173, "source_address" : "127.0.0.1", "user_agent" : "curl/7.58.0", "access_time" : "05/Dec/2019:15:33:19 +0000" }, "_ingest" : { "timestamp" : "2019-12-06T15:41:51.702802Z" } } } ] }
また、今回は「on_failure」要素を入れてあるので
"on_failure": [ { "set": { "field": "error", "value": "access log parse failed" } } ]
パースに失敗するようなデータを入れると
$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d ' { "docs": [ { "_source": { "message": "bad pattern" } } ] } '
この定義では、Set Processorでフィールドが追加されます。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "bad pattern", "error" : "access log parse failed" }, "_ingest" : { "timestamp" : "2019-12-06T15:46:58.738005Z" } } } ] }
先ほどの「on_failure」の例は、パイプライン全体として失敗した場合の扱いですが、こんな感じでプロセッサー単位に付けることも
できます。
$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d ' { "description": "parsing log pipeline", "processors": [ { "grok": { "field": "message", "patterns": [ "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request_method} %{DATA:url} HTTP/%{NUMBER:http_version}|-)?\" %{NUMBER:http_response_status_code:long} (?:%{NUMBER:http_response_body_bytes:long}|-)( \"%{DATA:http_request_referrer}\")?( \"%{DATA:user_agent}\")?" ], "on_failure": [ { "set": { "field": "error", "value": "access log parse failed" } } ] } } ] } '
結果は同じなので、割愛。
ですが、この「on_failure」パイプラインに書くのか、プロセッサー単位に書くのかで、その後にさらにプロセッサーが続くはずだった
場合には、挙動が変わります。
違いは、次の2ページを見るとよいでしょう。
https://speakerdeck.com/elastic/ingest-node-voxxed-luxembourg?slide=29
https://speakerdeck.com/elastic/ingest-node-voxxed-luxembourg?slide=31
他にも、処理対象がなかった時に無視するかどうかの、「ignore_missing」などもあったりします。
ところで、こんな感じにパース後のフィールド名に「.」を入れてパースすると
$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d ' { "description": "parsing log pipeline", "processors": [ { "grok": { "field": "message", "patterns": [ "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request.method} %{DATA:url} HTTP/%{NUMBER:http_request.version}|-)?\" %{NUMBER:http_response.status_code:long} (?:%{NUMBER:http_response.body_bytes:long}|-)( \"%{DATA:http_request.referrer}\")?( \"%{DATA:http_request.user_agent}\")?" ] } } ], "on_failure": [ { "set": { "field": "error", "value": "access log parse failed" } } ] } '
ネストした構造になります。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "http_response" : { "body_bytes" : 11173, "status_code" : 200 }, "user_name" : "-", "source_address" : "127.0.0.1", "http_request" : { "referrer" : "-", "method" : "GET", "version" : "1.1", "user_agent" : "curl/7.58.0" }, "message" : "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\"", "url" : "/index.html", "access_time" : "05/Dec/2019:15:33:19 +0000" }, "_ingest" : { "timestamp" : "2019-12-06T15:44:44.967901Z" } } } ] }
さて、話を戻しまして。最初にパースした結果は、こんな感じでした。
"doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "http_request_referrer" : "-", "http_response_status_code" : 200, "user_name" : "-", "http_version" : "1.1", "http_request_method" : "GET", "message" : "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\"", "url" : "/index.html", "http_response_body_bytes" : 11173, "source_address" : "127.0.0.1", "user_agent" : "curl/7.58.0", "access_time" : "05/Dec/2019:15:33:19 +0000" },
オリジナルの「message」というフィールドは、もう要らないかもしれませんね。Remove Processorで、削除してみましょう。
Remove Processor | Elasticsearch Reference [7.5] | Elastic
Remove Processorを追加して、パイプラインを再登録。
$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d ' { "description": "parsing log pipeline", "processors": [ { "grok": { "field": "message", "patterns": [ "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request_method} %{DATA:url} HTTP/%{NUMBER:http_version}|-)?\" %{NUMBER:http_response_status_code:long} (?:%{NUMBER:http_response_body_bytes:long}|-)( \"%{DATA:http_request_referrer}\")?( \"%{DATA:user_agent}\")?" ] } }, { "remove": { "field": "message" } } ], "on_failure": [ { "set": { "field": "error", "value": "access log parse failed" } } ] } '
結果。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "http_request_referrer" : "-", "http_response_status_code" : 200, "user_name" : "-", "http_version" : "1.1", "http_request_method" : "GET", "url" : "/index.html", "http_response_body_bytes" : 11173, "source_address" : "127.0.0.1", "user_agent" : "curl/7.58.0", "access_time" : "05/Dec/2019:15:33:19 +0000" }, "_ingest" : { "timestamp" : "2019-12-06T16:05:59.137718Z" } } } ] }
「message」フィールドがなくなりました。
最後に、ちょっと乱暴ですが、ステータスコードが200だったら「result」に「OK」、400以上だったら「NG」とするSet Processorを
ifと共に追加してみましょう。
$ curl -XPUT -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline -d ' { "description": "parsing log pipeline", "processors": [ { "grok": { "field": "message", "patterns": [ "%{IPORHOST:source_address} - %{DATA:user_name} \\[%{HTTPDATE:access_time}\\] \"(?:%{WORD:http_request_method} %{DATA:url} HTTP/%{NUMBER:http_version}|-)?\" %{NUMBER:http_response_status_code:long} (?:%{NUMBER:http_response_body_bytes:long}|-)( \"%{DATA:http_request_referrer}\")?( \"%{DATA:user_agent}\")?" ] } }, { "remove": { "field": "message" } }, { "set": { "if": "ctx.http_response_status_code == 200", "field": "result", "value": "OK" } }, { "set": { "if": "ctx.http_response_status_code >= 400", "field": "result", "value": "NG" } } ], "on_failure": [ { "set": { "field": "error", "value": "access log parse failed" } } ] } '
確認。
$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d ' { "docs": [ { "_source": { "message": "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 200 11173 \"-\" \"curl/7.58.0\"" } } ] } '
「OK」が入りました。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "http_request_referrer" : "-", "http_response_status_code" : 200, "user_name" : "-", "http_version" : "1.1", "http_request_method" : "GET", "url" : "/index.html", "result" : "OK", "http_response_body_bytes" : 11173, "source_address" : "127.0.0.1", "user_agent" : "curl/7.58.0", "access_time" : "05/Dec/2019:15:33:19 +0000" }, "_ingest" : { "timestamp" : "2019-12-06T16:08:07.124479Z" } } } ] }
「400」や「503」を入れてみます。
$ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d ' { "docs": [ { "_source": { "message": "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 400 11173 \"-\" \"curl/7.58.0\"" } } ] } ' $ curl -XPOST -H 'Content-Type: application/json' localhost:9200/_ingest/pipeline/parse-log-pipeline/_simulate?pretty -d ' { "docs": [ { "_source": { "message": "127.0.0.1 - - [05/Dec/2019:15:33:19 +0000] \"GET /index.html HTTP/1.1\" 503 11173 \"-\" \"curl/7.58.0\"" } } ] } '
それぞれの結果。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "http_request_referrer" : "-", "http_response_status_code" : 400, "user_name" : "-", "http_version" : "1.1", "http_request_method" : "GET", "url" : "/index.html", "result" : "NG", "http_response_body_bytes" : 11173, "source_address" : "127.0.0.1", "user_agent" : "curl/7.58.0", "access_time" : "05/Dec/2019:15:33:19 +0000" }, "_ingest" : { "timestamp" : "2019-12-06T16:08:57.00472Z" } } } ] } { "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "http_request_referrer" : "-", "http_response_status_code" : 503, "user_name" : "-", "http_version" : "1.1", "http_request_method" : "GET", "url" : "/index.html", "result" : "NG", "http_response_body_bytes" : 11173, "source_address" : "127.0.0.1", "user_agent" : "curl/7.58.0", "access_time" : "05/Dec/2019:15:33:19 +0000" }, "_ingest" : { "timestamp" : "2019-12-06T16:09:20.963558Z" } } } ] }
OKそうですね。
まとめ
ElasticsearchのIngest Nodeを、軽く試してみました。
最初はJSONで書かれたプロセッサーに面食らいましたが、ちゃんと見るとなんとか書けていけそうです。
エラーハンドリングなども合わせて、ちょっとずつ覚えていきましょう。