CLOVER🍀

That was when it all began.

Ansible Roleを開発、テストするためのMoleculeを試す

これは、なにをしたくて書いたもの?

AnsibleのRoleに対して、テストが書きたいなぁと思いまして。

調べてみたら、Moleculeというのが良さそうな感じだったので、ちょっと試してみることにしました。

Molecule — Molecule 2.22 documentation

Molecule入門

Molecule 備忘録 - Qiita

Molecule?

Ansible Roleを開発したり、テストするためのものみたいです。

Molecule — Molecule 2.22 documentation

Molecule is designed to aid in the development and testing of Ansible roles.

複数のインスタンス、OS、仮想化プロバイダー、テストに関するサポートを提供します、と。

Molecule provides support for testing with multiple instances, operating systems and distributions, virtualization providers, test frameworks and testing scenarios.

AnsibleのOrganizationにあるのもポイントですね。

GitHub - ansible/molecule: Molecule aids in the development and testing of Ansible roles.

現在のバージョンは、2.22です。

とりあえず、インストールしてGetting Startedに沿って試してみましょう。

環境

インストールに必要な情報は、こちら。

Installation — Molecule 2.22 documentation

OSは、CentOS 7かUbuntu 16.xらしいですが…今回はこんな感じでいきます。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 18.04.3 LTS
Release:    18.04
Codename:   bionic

$ docker version
Client: Docker Engine - Community
 Version:           19.03.5
 API version:       1.40
 Go version:        go1.12.12
 Git commit:        633a0ea838
 Built:             Wed Nov 13 07:29:52 2019
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.5
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.12
  Git commit:       633a0ea838
  Built:            Wed Nov 13 07:28:22 2019
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.2.10
  GitCommit:        b34a5c8af56e510852c35414db4c1f4fa6172339
 runc:
  Version:          1.0.0-rc8+dev
  GitCommit:        3e425f80a8c931f88e6d94a8c831b9d5aa481657
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683


$ python3 -V
Python 3.6.9


$ pip3 -V
pip 9.0.1 from /path/to/venv/lib/python3.6/site-packages (python 3.6)

Moleculeのインストール。

$ pip3 install molecule

Driverとして、デフォルトでDockerが使用されるので、こちらもインストール。

$ pip3 install molecule[docker]

molecule[docker]のインストールについては書いてないな?と思ったら、Getting Startedに書いてありました…。

Getting Started Guide — Molecule 2.22 documentation

バージョン。Ansibleも一緒にインストールされます。

$ pip3 freeze | grep -E 'ansible|molecule|docker|testinfra'
ansible==2.9.3
ansible-lint==4.2.0
docker==4.1.0
molecule==2.22
testinfra==3.4.0

また、テストフレームワークとしては、Testinfraが使われます。

Testinfra test your infrastructure — testinfra 3.4.1.dev0+gd7a7512.d20200105 documentation

Driverには、Docker以外にもEC2、Azure、GCEなど、いろいろあるようです。

Configuration / Driver

お題と確認

今回は、Ubuntu LinuxにApacheをインストールするRoleを作って、テストを動かすことをゴールにしてみます。

基本は、Getting Startedに沿っていく形で。

Getting Started Guide — Molecule 2.22 documentation

まずは、新しいRoleを作成します。

Creating a new role

「molecule init role」で作成します。Role名は、「apache」にしました。

$ molecule init role -r apache

なお、すでに存在するRoleにMoleculeを追加する場合は、以下のコマンドになるようです。

molecule init scenario -r my-role-name

作成されたファイルは、こんな感じです。

$ find apache -type f
apache/handlers/main.yml
apache/README.md
apache/vars/main.yml
apache/meta/main.yml
apache/defaults/main.yml
apache/tasks/main.yml
apache/molecule/default/playbook.yml
apache/molecule/default/tests/__pycache__/test_default.cpython-36.pyc
apache/molecule/default/tests/test_default.py
apache/molecule/default/Dockerfile.j2
apache/molecule/default/molecule.yml
apache/molecule/default/INSTALL.rst
apache/.yamllint

Ansible Roleと、Moleculeに関連するファイルがそれぞれできた感じですね。Moleculeに関連するファイルは、Role内に納まっています。

Molecule側のファイルを、ちょっと見てみましょう。

The Scenario Layout

moleculeディレクトリ配下の、「default」というのはシナリオの名前です。

Molecule Scenarios

Dockerfileのテンプレート。
apache/molecule/default/Dockerfile.j2

# Molecule managed

{% if item.registry is defined %}
FROM {{ item.registry.url }}/{{ item.image }}
{% else %}
FROM {{ item.image }}
{% endif %}

{% if item.env is defined %}
{% for var, value in item.env.items() %}
{% if value %}
ENV {{ var }} {{ value }}
{% endif %}
{% endfor %}
{% endif %}

RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get install -y python sudo bash ca-certificates iproute2 && apt-get clean; \
    elif [ $(command -v dnf) ]; then dnf makecache && dnf --assumeyes install python sudo python-devel python*-dnf bash iproute && dnf clean all; \
    elif [ $(command -v yum) ]; then yum makecache fast && yum install -y python sudo yum-plugin-ovl bash iproute && sed -i 's/plugins=0/plugins=1/g' /etc/yum.conf && yum clean all; \
    elif [ $(command -v zypper) ]; then zypper refresh && zypper install -y python sudo bash python-xml iproute2 && zypper clean -a; \
    elif [ $(command -v apk) ]; then apk update && apk add --no-cache python sudo bash ca-certificates; \
    elif [ $(command -v xbps-install) ]; then xbps-install -Syu && xbps-install -y python sudo bash ca-certificates iproute2 && xbps-remove -O; fi

Molecureの設定ファイル。
apache/molecule/default/molecule.yml

---
dependency:
  name: galaxy
driver:
  name: docker
lint:
  name: yamllint
platforms:
  - name: instance
    image: centos:7
provisioner:
  name: ansible
  lint:
    name: ansible-lint
verifier:
  name: testinfra
  lint:
    name: flake8

Playbook。
apache/molecule/default/playbook.yml

---
- name: Converge
  hosts: all
  roles:
    - role: apache

テストコード。
apache/molecule/default/tests/test_default.py

import os

import testinfra.utils.ansible_runner

testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    os.environ['MOLECULE_INVENTORY_FILE']
).get_hosts('all')


def test_hosts_file(host):
    f = host.file('/etc/hosts')

    assert f.exists
    assert f.user == 'root'
    assert f.group == 'root'

ざっと、こんな感じです。

とりあえず、Role内に移動。

$ cd apache

DriverはDockerとなっていますが、OSのイメージは今回はUbuntu Linux 18.04に変更します。
molecule.yml

driver:
  name: docker
lint:
  name: yamllint
platforms:
  - name: instance
    image: ubuntu:18.04

「molecule create」で、インスタンスを作成。今回は、DriverにDockerを使用しているので、Dockerコンテナが立ち上がります。

$ molecule create

インスタンスは、「molecule list」で確認することができます。

$ molecule list
--> Validating schema /path/to/apache/molecule/default/molecule.yml.
Validation completed successfully.
Instance Name    Driver Name    Provisioner Name    Scenario Name    Created    Converged
---------------  -------------  ------------------  ---------------  ---------  -----------
instance         docker         ansible             default          true       false

Dockerコンテナなので、dockerコマンドでも確認できます。

$ docker container ps
CONTAINER ID        IMAGE                         COMMAND                  CREATED             STATUS              PORTS               NAMES
3bf5b879b166        molecule_local/ubuntu:18.04   "bash -c 'while true…"   14 seconds ago      Up 10 seconds                           instance

イメージ名は、こんな感じに。

$ docker image ls
REPOSITORY                         TAG                 IMAGE ID            CREATED             SIZE
molecule_local/ubuntu              18.04               304dca78c3df        3 minutes ago       132MB
...

このインスタンスにログインする場合は、「molecule login」を使います。

$ molecule login

コンテナの中に入れます。

root@instance:/# uname -a
Linux instance 4.15.0-74-generic #84-Ubuntu SMP Thu Dec 19 08:06:28 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

このインスタンスを破棄するには、「molecule destroy」で。

$ molecule destroy

ここで本来のRoleの方に、Apacheをインストールするタスクを書いてみましょう。
tasks/main.yml

---
- name: install apache2
  apt:
    name: apache2
    state: present

「molecule converge」を実行すると、「molecule create」に加えてPlaybookの実行まで行ってくれます。

$ molecule converge

こんな感じですね。

--> Scenario: 'default'
--> Action: 'converge'
    
    PLAY [Converge] ****************************************************************
    
    TASK [Gathering Facts] *********************************************************
[DEPRECATION WARNING]: Distribution Ubuntu 18.04 on host instance should use 
/usr/bin/python3, but is using /usr/bin/python for backward compatibility with 
prior Ansible releases. A future Ansible release will default to using the 
discovered platform python for this host. See https://docs.ansible.com/ansible/
2.9/reference_appendices/interpreter_discovery.html for more information. This 
feature will be removed in version 2.12. Deprecation warnings can be disabled 
by setting deprecation_warnings=False in ansible.cfg.
    ok: [instance]
    
    TASK [apache : install apache2] ************************************************
[WARNING]: Updating cache and auto-installing missing dependency: python-apt

    changed: [instance]
    
    PLAY RECAP *********************************************************************
    instance                   : ok=2    changed=1    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0
    

これでコンテナにログインすると、Apacheがインストールされた状態を確認することができます。

次に、テストをしてみましょう。

「molecule init」時に作成されたテストコードを、こんな感じに修正。
molecule/default/tests/test_default.py

import os

import testinfra.utils.ansible_runner

testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    os.environ['MOLECULE_INVENTORY_FILE']
).get_hosts('all')

def test_apache_is_installed(host):
    apache2 = host.package('apache2')
    assert apache2.is_installed

Apacheがインストールされたことを確認するテストにしました。

テストは、「molecule verify」で実行します。

$ molecule verify

ログ。

collected 1 item                                                               
    
    tests/test_default.py .                                                  [100%]
    
    ============================== 1 passed in 3.37s ===============================
Verifier completed successfully.

ちゃんとテストが動作しているか確認するために、失敗するテストにしてみましょう。

def test_apache_is_installed(host):
    apache2 = host.package('apache2')
    assert apache2.is_installed == False

確認。

collected 1 item                                                               
    
    tests/test_default.py F                                                  [100%]
    
    =================================== FAILURES ===================================
    _________________ test_apache_is_installed[ansible://instance] _________________
    
    host = <testinfra.host.Host object at 0x7fde1da8af28>
    
        def test_apache_is_installed(host):
            apache2 = host.package('apache2')
    >       assert apache2.is_installed == False
    E       assert True == False
    E        +  where True = <package apache2>.is_installed
    
    tests/test_default.py:11: AssertionError
    ============================== 1 failed in 3.08s ===============================

テストを実行してくれてそうですね。

使い終わったインスタンスの破棄は、やっぱり「molecule destroy」で。

$ molecule destroy

最後にここまで一気に実行するには、「molecule test」を実行します。

$ molecule test

デフォルトで作成したRoleのままだと、metaがlintでエラーになってまうので、エラーメッセージを読んで修正しておきましょう。

--> Scenario: 'default'
--> Action: 'lint'
--> Executing Yamllint on files found in /path/to/apache/...
Lint completed successfully.
--> Executing Flake8 on files found in /path/to/apache/molecule/default/tests/...
Lint completed successfully.
--> Executing Ansible Lint on /path/to/apache/molecule/default/playbook.yml...
    [701] Role info should contain platforms
    meta/main.yml:1
    {'meta/main.yml': {'galaxy_info': {'author': 'your name', 'description': 'your description', 'company': 'your company (optional)', 'license': 'license (GPLv2, CC-BY, etc)', 'min_ansible_version': 1.2, 'galaxy_tags': [], '__line__': 2, '__file__': '/path/to/apache/meta/main.yml'}, 'dependencies': [], '__line__': 1, '__file__': '/path/to/apache/meta/main.yml', 'skipped_rules': []}}
    
    [703] Should change default metadata: author
    meta/main.yml:1
    {'meta/main.yml': {'galaxy_info': {'author': 'your name', 'description': 'your description', 'company': 'your company (optional)', 'license': 'license (GPLv2, CC-BY, etc)', 'min_ansible_version': 1.2, 'galaxy_tags': [], '__line__': 2, '__file__': '/path/to/apache/meta/main.yml'}, 'dependencies': [], '__line__': 1, '__file__': '/path/to/apache/meta/main.yml', 'skipped_rules': []}}
    
    [703] Should change default metadata: description
    meta/main.yml:1
    {'meta/main.yml': {'galaxy_info': {'author': 'your name', 'description': 'your description', 'company': 'your company (optional)', 'license': 'license (GPLv2, CC-BY, etc)', 'min_ansible_version': 1.2, 'galaxy_tags': [], '__line__': 2, '__file__': '/path/to/apache/meta/main.yml'}, 'dependencies': [], '__line__': 1, '__file__': '/path/to/apache/meta/main.yml', 'skipped_rules': []}}
    
    [703] Should change default metadata: company
    meta/main.yml:1
    {'meta/main.yml': {'galaxy_info': {'author': 'your name', 'description': 'your description', 'company': 'your company (optional)', 'license': 'license (GPLv2, CC-BY, etc)', 'min_ansible_version': 1.2, 'galaxy_tags': [], '__line__': 2, '__file__': '/path/to/apache/meta/main.yml'}, 'dependencies': [], '__line__': 1, '__file__': '/path/to/apache/meta/main.yml', 'skipped_rules': []}}
    
    [703] Should change default metadata: license
    meta/main.yml:1
    {'meta/main.yml': {'galaxy_info': {'author': 'your name', 'description': 'your description', 'company': 'your company (optional)', 'license': 'license (GPLv2, CC-BY, etc)', 'min_ansible_version': 1.2, 'galaxy_tags': [], '__line__': 2, '__file__': '/path/to/apache/meta/main.yml'}, 'dependencies': [], '__line__': 1, '__file__': '/path/to/apache/meta/main.yml', 'skipped_rules': []}}

各コマンドでどんなことが実行されるのかは、実行時にmatrixが出力されるので、こちらで確認できます。

「molecule test」だと、こうなります。

--> Test matrix
    
└── default
    ├── lint
    ├── dependency
    ├── cleanup
    ├── destroy
    ├── syntax
    ├── create
    ├── prepare
    ├── converge
    ├── idempotence
    ├── side_effect
    ├── verify
    ├── cleanup
    └── destroy

コマンドのリストは、こちら。

Usage — Molecule 2.22 documentation

デバッグログを出力したかったら、「--debug」オプションを使います。

$ molecule --debug test

とりあえず、簡単な使い方は押さえられた感じでしょうか?

LogstashのPipeline to Pipeline Communicationを試す

これは、なにをしたくて書いたもの?

Logstashのパイプラインを書いていると、だんだん設定ファイルが大きくなってきて、困るなぁと。

こういう時、どうしたらいいのだろうと調べたら、こういうのがあったので試してみることにしました。

Pipeline-to-Pipeline Communication | Logstash Reference [7.5] | Elastic

Pipeline to Pipeline Communication

Logstashのパイプラインでfilterを使い、変換処理やパース処理を書いていると、だんだん設定ファイルが大きくなってきます。

あんまり大きくなりすぎるのも見通しが悪くなるので嫌なのですが、includeみたいな方法で分割することはできなさそうです。

'include functionality' for logstash config files · Issue #5097 · elastic/logstash · GitHub

Multiple Pipelineで設定を分けてもいいのですが、パイプライン自体が分かれてしまうのが嫌な時もあります(Inputが増えるし)。

Multiple Pipelines | Logstash Reference [7.5] | Elastic

ひとつのパイプラインを分割したい時は、こちらを使うのが良さそうです。

Pipeline-to-Pipeline Communication | Logstash Reference [7.5] | Elastic

Multiple Pipelineを使っている時に、パイプラインからパイプラインへイベントを転送することができます。

あくまで、同一Logstashインスタンス内での話です。複数のLogstashインスタンス間の通信であれば、こちらのドキュメントを
参照してください。

Logstash-to-Logstash Communication | Logstash Reference [7.5] | Elastic

使い方をざっくり書くと、Outputでpipeline/send_toで転送先を指定し

output {
  pipeline { send_to => next }
}

別のパイプラインのInputのpipeline/addressで受け取るようにします。

input {
  pipeline { address => next }
}

Outputをif文で分岐させれば、メッセージの内容に応じて転送先を振り分けることもできますし

output {
  if [type] == apache {
    pipeline { send_to => weblogs }
  } else if [type] == system {
    pipeline { send_to => syslog }
  } else {
    pipeline { send_to => fallback }
  }
}

複数のパイプラインに転送することもできます。

output {
  pipeline { send_to => [foo, bar] }
}

この時、pipelines.ymlではそれぞれの設定に応じたpipeline.idを指定します。
/etc/logstash/pipelines.yml

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/main.conf"
- pipeline.id: next
  path.config: "/etc/logstash/conf.d/next.conf"

ポイントをいくつか。

How it works

  • 転送先のパイプラインが利用できない場合は、出力がブロックされる
  • イベントを転送する際には、データのコピーが発生する
    • このため、パフォーマンス(ヒープの利用)に影響する
  • ダウンストリームのパイプラインの変更は、アップストリームのパイプラインに影響しない

Delivery guarantee

  • 少なくとも、1回の配送保証がある
    • ダウンストリームパイプラインが利用できない場合、出力がブロックされる
  • ensure_deliveryをfalseに設定すると、ダウンストリームパイプラインが利用できない場合、メッセージが破棄される
  • パイプラインは、起動中または再ロード中に利用可能であると見なされ、プラグインによりブロックされている場合は利用不可と見なされる

とまあ、前置きはこんな感じにして、とりあえず試してみましょう。

環境

今回の環境は、こちら。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 18.04.3 LTS
Release:    18.04
Codename:   bionic


$ java --version
openjdk 11.0.5 2019-10-15
OpenJDK Runtime Environment (build 11.0.5+10-post-Ubuntu-0ubuntu1.118.04)
OpenJDK 64-Bit Server VM (build 11.0.5+10-post-Ubuntu-0ubuntu1.118.04, mixed mode, sharing)

Logstashはaptでインストールを行い、バージョンは7.5.1とします。

お題

Http input pluginを使い、その内容によってパイプラインを振り分けてみましょう。

Http input plugin | Logstash Reference [7.5] | Elastic

まずはセットアップ

pipelines.ymlは、以下のデフォルトの状態とします。

$ grep -vE '^#|^ *$' /etc/logstash/pipelines.yml 
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"

まずは、こんな感じにシンプルなパイプラインを用意します。
/etc/logstash/conf.d/main.conf

input {
  http { }
}

output {
  stdout { }
}

Logstashを再起動して、動作確認。

$ sudo systemctl restart logstash
$ sudo journalctl -u logstash -f

JSONメッセージを送ると

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello"}'

ログにこんな感じに出力されます。

^PJan 15 14:09:58 ubuntu1804.localdomain logstash[10683]: {
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:        "headers" => {
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:            "http_version" => "HTTP/1.1",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:             "http_accept" => "*/*",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:          "request_method" => "POST",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:            "request_path" => "/",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:               "http_host" => "localhost:8080",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:            "content_type" => "application/json",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:          "content_length" => "20"
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:     },
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:       "@version" => "1",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:        "message" => "hello",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:           "host" => "127.0.0.1",
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]:     "@timestamp" => 2020-01-15T14:09:58.497Z
Jan 15 14:09:58 ubuntu1804.localdomain logstash[10683]: }

まずは、仕込みはOKです。

別のパイプラインに転送する

では、先ほどのパイプラインで受け取ったイベントを、別のパイプラインに転送してみましょう。

転送先のパイプライン設定は、今回は別のディレクトリに置くことにします。

$ sudo mkdir /etc/logstash/conf.d/dispatch-pipelines

ちなみに、ドキュメントにもあるように、pipeilnes.ymlにパイプライン定義を直接記述することもできますが、今回はやめておきます。

こんな感じのパイプラインを作成。
/etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf

input {
  pipeline { address => foo }
}

filter {
  mutate {
    add_field => { "target_pipeline" => "foo" }
  }
}

output {
  stdout { }
}

「address」は「foo」とし、どのパイプラインが受けたかわかるようにMutate filterでフィールドを追加しました。

最初に用意したパイプライン設定は、Outputでpipeline/send_toを使用します。
/etc/logstash/conf.d/main.conf

input {
  http { }
}

output {
  pipeline { send_to => foo }
}

この時、send_toで指定する名前は、転送先(ダウンストリーム)のパイプラインのaddressで指定してある値と合わせてください。

pipelines.ymlは、このように設定。

$ grep -vE '^#|^ *$' /etc/logstash/pipelines.yml
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: foo-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf"

この時、pipeline.idがmainの方が*(ワイルドカード)設定ファイルを読んでいるからといって、ここにマッチするように転送先の
パイプラインを置くと、無限ループするので気をつけてください。

あくまで、別々のpipeline.idを与えましょう、と。

Logstashを再起動して、確認。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello"}'

フィールドが追加されました。

Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]: {
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:             "message" => "hello",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:            "@version" => "1",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:             "headers" => {
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:            "request_path" => "/",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:            "http_version" => "HTTP/1.1",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:          "content_length" => "20",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:            "content_type" => "application/json",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:               "http_host" => "localhost:8080",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:             "http_accept" => "*/*",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:          "request_method" => "POST"
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:     },
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:                "host" => "127.0.0.1",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:     "target_pipeline" => "foo",
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]:          "@timestamp" => 2020-01-15T14:26:24.474Z
Jan 15 14:26:24 ubuntu1804.localdomain logstash[11342]: }

別のパイプラインに転送されたのが確認できましたね。

複数のパイプラインに転送する

今度は、複数のパイプラインに転送してみましょう。

先ほどの転送先のパイプライン設定をコピーして

$ sudo cp /etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf /etc/logstash/conf.d/dispatch-pipelines/bar-pipeline.conf

「foo」と書いていた部分を、「bar」に変更します。
/etc/logstash/conf.d/dispatch-pipelines/bar-pipeline.conf

input {
  pipeline { address => bar }
}

filter {
  mutate {
    add_field => { "target_pipeline" => "bar" }
  }
}

output {
  stdout { }
}

アップストリームのパイプラインでは、転送先を配列で指定します。
/etc/logstash/conf.d/main.conf

input {
  http { }
}

output {
  pipeline { send_to => [foo, bar] }
}

pipeline.ymlにも追記。

$ grep -vE '^#|^ *$' /etc/logstash/pipelines.yml 
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: foo-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf"
- pipeline.id: bar-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/bar-pipeline.conf"

Logstashを再起動して、確認します。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello"}'

2つ分、出力されました。

Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]: {
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "message" => "hello",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:                "host" => "127.0.0.1",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "@timestamp" => 2020-01-15T14:33:47.107Z,
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "@version" => "1",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:     "target_pipeline" => "bar",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "headers" => {
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "request_method" => "POST",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "http_accept" => "*/*",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "content_type" => "application/json",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "content_length" => "20",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:               "http_host" => "localhost:8080",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "request_path" => "/",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "http_version" => "HTTP/1.1"
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:     }
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]: }
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]: {
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "message" => "hello",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:                "host" => "127.0.0.1",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "@timestamp" => 2020-01-15T14:33:47.107Z,
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "@version" => "1",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:     "target_pipeline" => "foo",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "headers" => {
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "request_method" => "POST",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:             "http_accept" => "*/*",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "content_type" => "application/json",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:          "content_length" => "20",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:               "http_host" => "localhost:8080",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "request_path" => "/",
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:            "http_version" => "HTTP/1.1"
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]:     }
Jan 15 14:33:47 ubuntu1804.localdomain logstash[11457]: }

OKそうです。

転送先のパイプラインを、条件で振り分ける

最後に、転送先のパイプラインを条件で振り分けてみましょう。

fooパイプラインをコピーして、新しいパイプラインを作成。

$ sudo cp /etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf /etc/logstash/conf.d/dispatch-pipelines/hoge-pipeline.conf

今度は、「foo」の部分を「hoge」にしましょうか。
/etc/logstash/conf.d/dispatch-pipelines/hoge-pipeline.conf

input {
  pipeline { address => hoge }
}

filter {
  mutate {
    add_field => { "target_pipeline" => "hoge" }
  }
}

output {
  stdout { }
}

転送先は、tagsフィールドで条件分岐することにします。
/etc/logstash/conf.d/main.conf

input {
  http { }
}

output {
  if "foo" in [tags] or "bar" in [tags] {
    pipeline { send_to => [foo, bar] }
  } else if "hoge" in [tags] {
    pipeline { send_to => hoge }
  } else {
    stdout { }
  }
}

tagsフィールドに「foo」または「bar」が入っていれば、foo、barの両方のパイプラインに転送、「hoge」が入っていればhogeパイプラインに
転送、それ以外はそのまま標準出力に書き出します。

  if "foo" in [tags] or "bar" in [tags] {
    pipeline { send_to => [foo, bar] }
  } else if "hoge" in [tags] {
    pipeline { send_to => hoge }
  } else {
    stdout { }
  }

pipelines.ymlにも追記。

$ grep -vE '^#|^ *$' /etc/logstash/pipelines.yml 
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: foo-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/foo-pipeline.conf"
- pipeline.id: bar-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/bar-pipeline.conf"
- pipeline.id: hoge-pipeline
  path.config: "/etc/logstash/conf.d/dispatch-pipelines/hoge-pipeline.conf"

Logstashを再起動して、確認してみます。

まずは、tagsなし。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello"}'

追加フィールドはありません。

Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]: {
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:           "host" => "127.0.0.1",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:     "@timestamp" => 2020-01-15T14:43:56.577Z,
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:        "message" => "hello",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:       "@version" => "1",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:        "headers" => {
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:            "http_version" => "HTTP/1.1",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:               "http_host" => "localhost:8080",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:            "request_path" => "/",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:          "request_method" => "POST",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:            "content_type" => "application/json",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:          "content_length" => "20",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:             "http_accept" => "*/*"
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]:     }
Jan 15 14:43:56 ubuntu1804.localdomain logstash[11598]: }

fooタグをつけてみましょう。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello", "tags": ["foo"]}'

fooとbarのパイプラインが実行されます。

Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]: {
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "message" => "hello",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     "target_pipeline" => "foo",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "@version" => "1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "headers" => {
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "http_version" => "HTTP/1.1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:               "http_host" => "localhost:8080",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "request_path" => "/",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "request_method" => "POST",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "content_type" => "application/json",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "content_length" => "37",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "http_accept" => "*/*"
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     },
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:                "host" => "127.0.0.1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "@timestamp" => 2020-01-15T14:44:51.068Z,
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:                "tags" => [
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:         [0] "foo"
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     ]
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]: }
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]: {
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "message" => "hello",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     "target_pipeline" => "bar",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "@version" => "1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "headers" => {
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "http_version" => "HTTP/1.1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:               "http_host" => "localhost:8080",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "request_path" => "/",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "request_method" => "POST",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:            "content_type" => "application/json",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "content_length" => "37",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:             "http_accept" => "*/*"
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     },
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:                "host" => "127.0.0.1",
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:          "@timestamp" => 2020-01-15T14:44:51.068Z,
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:                "tags" => [
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:         [0] "foo"
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]:     ]
Jan 15 14:44:51 ubuntu1804.localdomain logstash[11598]: }

barタグの指定は、割愛します。

hogeタグを指定。

$ curl -XPOST -H 'Content-Type: application/json' localhost:8080 -d '{"message": "hello", "tags": ["hoge"]}'

hogeパイプラインが実行されます。

Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]: {
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:             "message" => "hello",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:     "target_pipeline" => "hoge",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:            "@version" => "1",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:             "headers" => {
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:            "http_version" => "HTTP/1.1",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:               "http_host" => "localhost:8080",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:            "request_path" => "/",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:          "request_method" => "POST",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:            "content_type" => "application/json",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:          "content_length" => "38",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:         "http_user_agent" => "curl/7.58.0",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:             "http_accept" => "*/*"
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:     },
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:                "host" => "127.0.0.1",
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:          "@timestamp" => 2020-01-15T14:45:22.827Z,
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:                "tags" => [
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:         [0] "hoge"
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]:     ]
Jan 15 14:45:23 ubuntu1804.localdomain logstash[11598]: }

OKそうですね。

こんな感じで、あるパイプラインから別のパイプラインに転送できることを確認できました。

ソース?

ちょっと、関連するソースコードを見てみました。

このあたりでしょうか。

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineInput.java

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineOutput.java

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/lib/logstash/plugins/builtin/pipeline/output.rb

転送時にイベントがコピーされるというのは、ここのことみたいですね。

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java#L41

配送保証は、このあたりに影響するみたいですね。

https://github.com/elastic/logstash/blob/v7.5.1/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java#L47