CLOVER🍀

That was when it all began.

LogstashのPipeline to Pipeline Communicationを試す

これは、なにをしたくて書いたもの?

Logstashのパイプラインを書いていると、だんだん設定ファイルが大きくなってきて、困るなぁと。

こういう時、どうしたらいいのだろうと調べたら、こういうのがあったので試してみることにしました。

Pipeline-to-Pipeline Communication | Logstash Reference [7.5] | Elastic

Pipeline to Pipeline Communication

Logstashのパイプラインでfilterを使い、変換処理やパース処理を書いていると、だんだん設定ファイルが大きくなってきます。

あんまり大きくなりすぎるのも見通しが悪くなるので嫌なのですが、includeみたいな方法で分割することはできなさそうです。

'include functionality' for logstash config files · Issue #5097 · elastic/logstash · GitHub

Multiple Pipelineで設定を分けてもいいのですが、パイプライン自体が分かれてしまうのが嫌な時もあります(Inputが増えるし)。

Multiple Pipelines | Logstash Reference [7.5] | Elastic

ひとつのパイプラインを分割したい時は、こちらを使うのが良さそうです。

Pipeline-to-Pipeline Communication | Logstash Reference [7.5] | Elastic

Multiple Pipelineを使っている時に、パイプラインからパイプラインへイベントを転送することができます。

あくまで、同一Logstashインスタンス内での話です。複数のLogstashインスタンス間の通信であれば、こちらのドキュメントを
参照してください。

Logstash-to-Logstash Communication | Logstash Reference [7.5] | Elastic

使い方をざっくり書くと、Outputでpipeline/send_toで転送先を指定し

output {
  pipeline { send_to => next }
}

別のパイプラインのInputのpipeline/addressで受け取るようにします。

input {
  pipeline { address => next }
}

Outputをif文で分岐させれば、メッセージの内容に応じて転送先を振り分けることもできますし

output {
  if [type] == apache {
    pipeline { send_to => weblogs }
  } else if [type] == system {
    pipeline { send_to => syslog }
  } else {
    pipeline { send_to => fallback }
  }
}

複数のパイプラインに転送することもできます。

output {
  pipeline { send_to => [foo, bar] }
}

この時、pipelines.ymlではそれぞれの設定に応じたpipeline.idを指定します。
/etc/logstash/pipelines.yml

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/main.conf"
- pipeline.id: next
  path.config: "/etc/logstash/conf.d/next.conf"

ポイントをいくつか。

How it works

  • 転送先のパイプラインが利用できない場合は、出力がブロックされる
  • イベントを転送する際には、データのコピーが発生する
    • このため、パフォーマンス(ヒープの利用)に影響する
  • ダウンストリームのパイプラインの変更は、アップストリームのパイプラインに影響しない

Delivery guarantee

  • 少なくとも、1回の配送保証がある
    • ダウンストリームパイプラインが利用できない場合、出力がブロックされる
  • ensure_deliveryをfalseに設定すると、ダウンストリームパイプラインが利用できない場合、メッセージが破棄される
  • パイプラインは、起動中または再ロード中に利用可能であると見なされ、プラグインによりブロックされている場合は利用不可と見なされる

とまあ、前置きはこんな感じにして、とりあえず試してみましょう。

環境

今回の環境は、こちら。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 18.04.3 LTS
Release:    18.04
Codename:   bionic


$ java --version
openjdk 11.0.5 2019-10-15
OpenJDK Runtime Environment (build 11.0.5+10-post-Ubuntu-0ubuntu1.118.04)
OpenJDK 64-Bit Server VM (build 11.0.5+10-post-Ubuntu-0ubuntu1.118.04, mixed mode, sharing)

Logstashはaptでインストールを行い、バージョンは7.5.1とします。

お題

Http input pluginを使い、その内容によってパイプラインを振り分けてみましょう。

Http input plugin | Logstash Reference [7.5] | Elastic

まずはセットアップ

pipelines.ymlは、以下のデフォルトの状態とします。

$ grep -vE '^#|^ *$' /etc/logstash/pipelines.yml 
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"

まずは、こんな感じにシンプルなパイプラインを用意します。
/etc/logstash/conf.d/main.conf

input {
  http { }
}

output {
  stdout { }
}

Logstashを再起動して、動作確認。

$ sudo systemctl restart logstash
$ sudo journalctl -u logstash -f

JSONメッセージを送ると

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello"}'

ログにこんな感じに出力されます。

^PJan 15 14:09:58 ubuntu1804.localdomain logstash[10683]: {
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:        "headers" => {
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:            "http_version" => "HTTP/1.1",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:             "http_accept" => "*/*",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:          "request_method" => "POST",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:            "request_path" => "/",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:               "http_host" => "localhost:8080",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:            "content_type" => "application/json",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:          "content_length" => "20"
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:     },
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:       "@version" => "1",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:        "message" => "hello",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:           "host" => "127.0.0.1",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:     "@timestamp" => 2020-01-15T14:09:58.497Z
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]: }

まずは、仕込みはOKです。

別のパイプラインに転送する

では、先ほどのパイプラインで受け取ったイベントを、別のパイプラインに転送してみましょう。

転送先のパイプライン設定は、今回は別のディレクトリに置くことにします。

$ sudo mkdir /etc/logstash/conf.d/dispatch-pipelines

ちなみに、ドキュメントにもあるように、pipeilnes.ymlにパイプライン定義を直接記述することもできますが、今回はやめておきます。

こんな感じのパイプラインを作成。
/etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf

input {
  pipeline { address => foo }
}

filter {
  mutate {
    add_field => { "target_pipeline" => "foo" }
  }
}

output {
  stdout { }
}

「address」は「foo」とし、どのパイプラインが受けたかわかるようにMutate filterでフィールドを追加しました。

最初に用意したパイプライン設定は、Outputでpipeline/send_toを使用します。
/etc/logstash/conf.d/main.conf

input {
  http { }
}

output {
  pipeline { send_to => foo }
}

この時、send_toで指定する名前は、転送先(ダウンストリーム)のパイプラインのaddressで指定してある値と合わせてください。

pipelines.ymlは、このように設定。

$ grep -vE '^#|^ *$' /etc/logstash/pipelines.yml
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: foo-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf"

この時、pipeline.idがmainの方が*(ワイルドカード)設定ファイルを読んでいるからといって、ここにマッチするように転送先の
パイプラインを置くと、無限ループするので気をつけてください。

あくまで、別々のpipeline.idを与えましょう、と。

Logstashを再起動して、確認。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello"}'

フィールドが追加されました。

Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]: {
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:             "message" => "hello",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:            "@version" => "1",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:             "headers" => {
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:            "request_path" => "/",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:            "http_version" => "HTTP/1.1",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:          "content_length" => "20",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:            "content_type" => "application/json",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:               "http_host" => "localhost:8080",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:             "http_accept" => "*/*",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:          "request_method" => "POST"
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:     },
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:                "host" => "127.0.0.1",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:     "target_pipeline" => "foo",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:          "@timestamp" => 2020-01-15T14:26:24.474Z
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]: }

別のパイプラインに転送されたのが確認できましたね。

複数のパイプラインに転送する

今度は、複数のパイプラインに転送してみましょう。

先ほどの転送先のパイプライン設定をコピーして

$ sudo cp /etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf /etc/logstash/conf.d/dispatch-pipelines/bar-pipeline.conf

「foo」と書いていた部分を、「bar」に変更します。
/etc/logstash/conf.d/dispatch-pipelines/bar-pipeline.conf

input {
  pipeline { address => bar }
}

filter {
  mutate {
    add_field => { "target_pipeline" => "bar" }
  }
}

output {
  stdout { }
}

アップストリームのパイプラインでは、転送先を配列で指定します。
/etc/logstash/conf.d/main.conf

input {
  http { }
}

output {
  pipeline { send_to => [foo, bar] }
}

pipeline.ymlにも追記。

$ grep -vE '^#|^ *$' /etc/logstash/pipelines.yml 
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: foo-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf"
- pipeline.id: bar-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/bar-pipeline.conf"

Logstashを再起動して、確認します。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello"}'

2つ分、出力されました。

Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]: {
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "message" => "hello",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:                "host" => "127.0.0.1",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "@timestamp" => 2020-01-15T14:33:47.107Z,
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "@version" => "1",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:     "target_pipeline" => "bar",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "headers" => {
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "request_method" => "POST",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "http_accept" => "*/*",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "content_type" => "application/json",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "content_length" => "20",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:               "http_host" => "localhost:8080",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "request_path" => "/",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "http_version" => "HTTP/1.1"
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:     }
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]: }
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]: {
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "message" => "hello",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:                "host" => "127.0.0.1",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "@timestamp" => 2020-01-15T14:33:47.107Z,
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "@version" => "1",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:     "target_pipeline" => "foo",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "headers" => {
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "request_method" => "POST",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "http_accept" => "*/*",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "content_type" => "application/json",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "content_length" => "20",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:               "http_host" => "localhost:8080",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "request_path" => "/",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "http_version" => "HTTP/1.1"
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:     }
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]: }

OKそうです。

転送先のパイプラインを、条件で振り分ける

最後に、転送先のパイプラインを条件で振り分けてみましょう。

fooパイプラインをコピーして、新しいパイプラインを作成。

$ sudo cp /etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf /etc/logstash/conf.d/dispatch-pipelines/hoge-pipeline.conf

今度は、「foo」の部分を「hoge」にしましょうか。
/etc/logstash/conf.d/dispatch-pipelines/hoge-pipeline.conf

input {
  pipeline { address => hoge }
}

filter {
  mutate {
    add_field => { "target_pipeline" => "hoge" }
  }
}

output {
  stdout { }
}

転送先は、tagsフィールドで条件分岐することにします。
/etc/logstash/conf.d/main.conf

input {
  http { }
}

output {
  if "foo" in [tags] or "bar" in [tags] {
    pipeline { send_to => [foo, bar] }
  } else if "hoge" in [tags] {
    pipeline { send_to => hoge }
  } else {
    stdout { }
  }
}

tagsフィールドに「foo」または「bar」が入っていれば、foo、barの両方のパイプラインに転送、「hoge」が入っていればhogeパイプラインに
転送、それ以外はそのまま標準出力に書き出します。

  if "foo" in [tags] or "bar" in [tags] {
    pipeline { send_to => [foo, bar] }
  } else if "hoge" in [tags] {
    pipeline { send_to => hoge }
  } else {
    stdout { }
  }

pipelines.ymlにも追記。

$ grep -vE '^#|^ *$' /etc/logstash/pipelines.yml 
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: foo-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf"
- pipeline.id: bar-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/bar-pipeline.conf"
- pipeline.id: hoge-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/hoge-pipeline.conf"

Logstashを再起動して、確認してみます。

まずは、tagsなし。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello"}'

追加フィールドはありません。

Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]: {
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:           "host" => "127.0.0.1",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:     "@timestamp" => 2020-01-15T14:43:56.577Z,
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:        "message" => "hello",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:       "@version" => "1",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:        "headers" => {
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:            "http_version" => "HTTP/1.1",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:               "http_host" => "localhost:8080",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:            "request_path" => "/",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:          "request_method" => "POST",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:            "content_type" => "application/json",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:          "content_length" => "20",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:             "http_accept" => "*/*"
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:     }
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]: }

fooタグをつけてみましょう。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello", "tags": ["foo"]}'

fooとbarのパイプラインが実行されます。

Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]: {
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "message" => "hello",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     "target_pipeline" => "foo",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "@version" => "1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "headers" => {
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "http_version" => "HTTP/1.1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:               "http_host" => "localhost:8080",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "request_path" => "/",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "request_method" => "POST",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "content_type" => "application/json",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "content_length" => "37",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "http_accept" => "*/*"
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     },
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:                "host" => "127.0.0.1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "@timestamp" => 2020-01-15T14:44:51.068Z,
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:                "tags" => [
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:         [0] "foo"
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     ]
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]: }
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]: {
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "message" => "hello",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     "target_pipeline" => "bar",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "@version" => "1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "headers" => {
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "http_version" => "HTTP/1.1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:               "http_host" => "localhost:8080",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "request_path" => "/",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "request_method" => "POST",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "content_type" => "application/json",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "content_length" => "37",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "http_accept" => "*/*"
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     },
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:                "host" => "127.0.0.1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "@timestamp" => 2020-01-15T14:44:51.068Z,
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:                "tags" => [
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:         [0] "foo"
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     ]
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]: }

barタグの指定は、割愛します。

hogeタグを指定。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello", "tags": ["hoge"]}'

hogeパイプラインが実行されます。

Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]: {
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:             "message" => "hello",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:     "target_pipeline" => "hoge",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:            "@version" => "1",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:             "headers" => {
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:            "http_version" => "HTTP/1.1",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:               "http_host" => "localhost:8080",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:            "request_path" => "/",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:          "request_method" => "POST",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:            "content_type" => "application/json",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:          "content_length" => "38",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:             "http_accept" => "*/*"
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:     },
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:                "host" => "127.0.0.1",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:          "@timestamp" => 2020-01-15T14:45:22.827Z,
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:                "tags" => [
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:         [0] "hoge"
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:     ]
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]: }

OKそうですね。

こんな感じで、あるパイプラインから別のパイプラインに転送できることを確認できました。

ソース?

ちょっと、関連するソースコードを見てみました。

このあたりでしょうか。

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineInput.java

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineOutput.java

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/lib/logstash/plugins/builtin/pipeline/output.rb

転送時にイベントがコピーされるというのは、ここのことみたいですね。

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java#L41

配送保証は、このあたりに影響するみたいですね。

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java#L47