これは、なにをしたくて書いたもの?
前に、Trinoを使ってMySQLとMinIOに格納されたデータにアクセスしてみました。
分散SQLクエリーエンジン、TrinoをUbuntu Linux 20.04 LTSにインストールしてMySQLに接続してみる - CLOVER🍀
Trinoから、Hive connectorでAmazon S3互換のオブジェクトストレージMinIOにアクセスしてみる - CLOVER🍀
今回は、MinIOとMySQLに格納されたデータをjoinしてアクセスしてみたいと思います。
環境
Trinoは、395を使用します。表示しているものはCLIのバージョンですが、サーバー側も同じバージョンを使用します。
$ trino --version
Trino CLI 395
Trinoサーバーは72.17.0.2で動作させますが、起動は後で行います。
Apache Hiveメタストアサービスは、3.0.0を使用します。また、172.17.0.3で動作させます。
Apache Hiveメタストアサービスに必要な、Apache Hadoopのバージョン。
$ /opt/hadoop/bin/hadoop version
Hadoop 3.3.4
Source code repository https://github.com/apache/hadoop.git -r a585a73c3e02ac62350c136643a5e7f6095a3dbb
Compiled by stevel on 2022-07-29T12:32Z
Compiled with protoc 3.7.1
From source with checksum fb9dd8918a7b8a5b430d61af858f6ec
This command was run using /opt/hadoop/share/hadoop/common/hadoop-common-3.3.4.jar
なお、TrinoもApache Hiveメタストアサービスも以下のJavaで動作させます。
$ java --version
openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment Temurin-17.0.4+8 (build 17.0.4+8)
OpenJDK 64-Bit Server VM Temurin-17.0.4+8 (build 17.0.4+8, mixed mode, sharing)
MinIO(Amazon S3)にアクセスするための、環境変数の設定はこちら。
$ export HADOOP_HOME=/opt/hadoop
$ AWS_SDK_JAR=$(find ${HADOOP_HOME}/share/hadoop/tools/lib -name 'aws-java-sdk-bundle-*.jar')
$ HADOOP_AWS_JAR=$(find ${HADOOP_HOME}/share/hadoop/tools/lib -name 'hadoop-aws-*.jar')
$ export HADOOP_CLASSPATH=${AWS_SDK_JAR}:${HADOOP_AWS_JAR}
Apache Hiveメタストアサービスの起動は、後で行います。
MinIOのバージョンは、こちら。
$ minio --version
minio version RELEASE.2022-09-07T22-25-02Z (commit-id=bb855499e1519f31c03c9b91c0f9f10cb6439253)
Runtime: go1.18.6 linux/amd64
License: GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html>
Copyright: 2015-2022 MinIO, Inc.
MinIOは以下のコマンドで起動し、172.17.0.4で動作しているものとします。
$ MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin minio server /var/lib/minio/data --console-address :9001
MinIO操作用のAWS CLIのバージョン。
$ aws --version
aws-cli/2.7.31 Python/3.9.11 Linux/5.4.0-125-generic exe/x86_64.ubuntu.20 prompt/off
クレデンシャルの設定。
$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin
$ export AWS_DEFAULT_REGION=ap-northeast-1
MySQLについてはこちら。172.17.0.5で動作しているものとし、接続情報はkazuhira
/password
で、practice
というデータベースを
作成済みとします。
$ mysql --version
mysql Ver 8.0.30 for Linux on x86_64 (MySQL Community Server - GPL)
データの準備
まずはデータの準備をしましょう。
MinIOにはCSVファイルを置くことにします。お題はサザエさんで、1行目をヘッダーにしたCSVファイルを3つ用意します。
isono-family.csv
family_id,id,first_name,last_name,age
1,1,サザエ,フグ田,24
1,2,マスオ,フグ田,28
1,3,波平,磯野,54
1,4,フネ,磯野,50
1,5,カツオ,磯野,11
1,6,ワカメ,磯野,9
1,7,タラオ,フグ田,3
namino-family.csv
family_id,id,first_name,last_name,age
2,1,ノリスケ,波野,26
2,2,タイコ,波野,22
2,3,イクラ,波野,1
isasaka-family.csv
family_id,id,first_name,last_name,age
3,1,難物,伊佐坂,60
3,2,お軽,伊佐坂,50
3,3,甚六,伊佐坂,20
3,4,浮江,伊佐,16
MinIOのエンドポイントはこちら。
$ MINIO_ENDPOINT=http://172.17.0.4:9000
バケットを作成して
$ aws s3 mb --endpoint-url $MINIO_ENDPOINT s3://trino-bucket
sync
でアップロード。
$ aws s3 sync --endpoint-url $MINIO_ENDPOINT . s3://trino-bucket/files
upload: ./isono-family.csv to s3://trino-bucket/files/isono-family.csv
upload: ./isasaka-family.csv to s3://trino-bucket/files/isasaka-family.csv
upload: ./namino-family.csv to s3://trino-bucket/files/namino-family.csv
確認。
$ aws s3 ls --endpoint-url $MINIO_ENDPOINT trino-bucket/files/
2022-09-13 22:46:43 131 isasaka-family.csv
2022-09-13 22:46:43 207 isono-family.csv
2022-09-13 22:46:43 112 namino-family.csv
MySQL側にもテーブルを作成します。MinIOにアップロードしたファイルとjoinする想定のものです。
create table family(
id integer,
name varchar(20),
primary key(id)
);
データの登録。
insert into family(id, name) values(1, '磯野家');
insert into family(id, name) values(2, '波野家');
insert into family(id, name) values(3, '伊佐坂');
確認。
mysql> select * from family;
+
| id | name |
+
| 1 | 磯野家 |
| 2 | 波野家 |
| 3 | 伊佐坂 |
+
3 rows in set (0.00 sec)
これで、データの準備は完了です。
TrinoとApache Hiveメタストアサービスの準備
続いては、TrinoおよびApache Hiveメタストアサービスの準備を行います。
まずはApache Hiveメタストアサービスから行いましょう。MinIOにアクセスするため、以下のように設定。
conf/metastore-site.xml
xml version="1.0" encoding="UTF-8" standalone="no"
xml-stylesheet type="text/xsl" href="configuration.xsl"
<configuration>
<property>
<name>metastore.thrift.uris</name>
<value>thrift://0.0.0.0:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>metastore.task.threads.always</name>
<value>org.apache.hadoop.hive.metastore.events.EventCleanerTask</value>
</property>
<property>
<name>metastore.expression.proxy</name>
<value>org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy</value>
</property>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>minioadmin</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>minioadmin</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>http://172.17.0.4:9000</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
</configuration>
起動。
$ bin/schematool -initSchema -dbType derby
$ bin/start-metastore
次に、Trinoの設定を行います。
Trino自体の設定。
etc/node.properties
node.environment=container
node.id=340fae6b-55fe-486e-b122-d0fbe61d0ebb
node.data-dir=/var/lib/trino-server/data
etc/jvm.config
-server
-Xmx2G
-XX:InitialRAMPercentage=80
-XX:MaxRAMPercentage=80
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=512M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
-XX:+UnlockDiagnosticVMOptions
-XX:+UseAESCTRIntrinsics
etc/config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery.uri=http://172.17.0.2:8080
続いて、Connectorの設定です。MinIOおよびMySQLにアクセスするための設定を行います。
MinIO。カタログ名はminio
としています。
etc/catalog/minio.properties
connector.name=hive
hive.metastore.uri=thrift://172.17.0.3:9083
hive.storage-format=ORC
hive.non-managed-table-writes-enabled=true
hive.non-managed-table-creates-enabled=true
hive.s3.aws-access-key=minioadmin
hive.s3.aws-secret-key=minioadmin
hive.s3.endpoint=http://172.17.0.4:9000
hive.s3.path-style-access=true
MySQL。カタログ名はmysql
としています。
etc/catalog/mysql.properties
connector.name=mysql
connection-url=jdbc:mysql://172.17.0.5:3306
connection-user=kazuhira
connection-password=password
これで、Trinoを起動。
$ bin/launcher run
TrinoからMinIOとMySQLにアクセスしてみる
準備が完了したので、TrinoからMinIOおよびMySQLにアクセスしてみましょう。
Trinoに接続。
$ trino --server 172.17.0.2:8080
trino>
まず、MinIOに向けてスキーマを作成します。
trino> create schema minio.bucket with(location = 's3a://trino-bucket/');
CREATE SCHEMA
MinIOのアップロードしたCSVファイルを参照するように、テーブルを作成。
create table minio.bucket.people (
family_id varchar,
id varchar,
first_name varchar,
last_name varchar,
age varchar
) with (
format = 'csv',
csv_separator = ',',
csv_quote = '"',
csv_escape = '"',
skip_header_line_count = 1,
external_location = 's3a://trino-bucket/files'
);
確認。
trino> select * from minio.bucket.people;
family_id | id | first_name | last_name | age
-----------+----+------------+-----------+-----
2 | 1 | ノリスケ | 波野 | 26
2 | 2 | タイコ | 波野 | 22
2 | 3 | イクラ | 波野 | 1
1 | 1 | サザエ | フグ田 | 24
1 | 2 | マスオ | フグ田 | 28
1 | 3 | 波平 | 磯野 | 54
1 | 4 | フネ | 磯野 | 50
1 | 5 | カツオ | 磯野 | 11
1 | 6 | ワカメ | 磯野 | 9
1 | 7 | タラオ | フグ田 | 3
3 | 1 | 難物 | 伊佐坂 | 60
3 | 2 | お軽 | 伊佐坂 | 50
3 | 3 | 甚六 | 伊佐坂 | 20
3 | 4 | 浮江 | 伊佐 | 16
(14 rows)
Query 20220913_134818_00002_68bq6, FINISHED, 1 node
Splits: 3 total, 3 done (100.00%)
1.79 [14 rows, 450B] [7 rows/s, 251B/s]
OKですね。
MySQL側は、あらかじめテーブルを作成しているため、Trinoからはすぐに認識できます。
trino> show tables from mysql.practice;
Table
family
(1 row)
Query 20220913_134822_00003_68bq6, FINISHED, 1 node
Splits: 7 total, 7 done (100.00%)
1.08 [1 rows, 24B] [0 rows/s, 22B/s]
テーブル定義や
trino> desc mysql.practice.family;
Column | Type | Extra | Comment
id | integer | |
name | varchar(20) | |
(2 rows)
Query 20220913_134852_00004_68bq6, FINISHED, 1 node
Splits: 7 total, 7 done (100.00%)
0.61 [2 rows, 120B] [3 rows/s, 197B/s]
データの中身も確認できますね。
trino> select * from mysql.practice.family;
id | name
1 | 磯野家
2 | 波野家
3 | 伊佐坂
(3 rows)
Query 20220913_134903_00005_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.43 [3 rows, 0B] [6 rows/s, 0B/s]
では、この2つのテーブルをjoinしてみましょう。
SELECT — Trino 395 Documentation
Trinoでは、以下のjoinの種類をサポートしているようです。
[ INNER ] JOIN
LEFT [ OUTER ] JOIN
RIGHT [ OUTER ] JOIN
FULL [ OUTER ] JOIN
CROSS JOIN
SQLを作成して
select p.id, f.name as family_name, p.first_name, p.last_name, p.age
from minio.bucket.people as p
inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
order by family_name, cast(p.age as integer) desc;
確認。
trino> select p.id, f.name as family_name, p.first_name, p.last_name, p.age
-> from minio.bucket.people as p
-> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
-> order by family_name, cast(p.age as integer) desc;
id | family_name | first_name | last_name | age
1 | 伊佐坂 | 難物 | 伊佐坂 | 60
2 | 伊佐坂 | お軽 | 伊佐坂 | 50
3 | 伊佐坂 | 甚六 | 伊佐坂 | 20
4 | 伊佐坂 | 浮江 | 伊佐 | 16
1 | 波野家 | ノリスケ | 波野 | 26
2 | 波野家 | タイコ | 波野 | 22
3 | 波野家 | イクラ | 波野 | 1
3 | 磯野家 | 波平 | 磯野 | 54
4 | 磯野家 | フネ | 磯野 | 50
2 | 磯野家 | マスオ | フグ田 | 28
1 | 磯野家 | サザエ | フグ田 | 24
5 | 磯野家 | カツオ | 磯野 | 11
6 | 磯野家 | ワカメ | 磯野 | 9
7 | 磯野家 | タラオ | フグ田 | 3
(14 rows)
Query 20220913_134916_00006_68bq6, FINISHED, 1 node
Splits: 15 total, 15 done (100.00%)
1.05 [20 rows, 606B] [19 rows/s, 578B/s]
すごくあっさり動きました。
今回CSVファイルからテーブルを作っているので、すべてのカラムが文字列になっています。このままだとMySQLにあるテーブルの列と
joinする際に困るのでキャストしていたのですが。
もしもキャストをやめた場合は、以下のように型が合わずにエラーになります。
trino> select p.id, f.name as family_name, p.first_name, p.last_name, p.age
-> from minio.bucket.people as p
-> inner join mysql.practice.family as f on p.family_id as integer = f.id
-> order by family_name, cast(p.age as integer) desc;
Query 20220912_162900_00017_kiyr9 failed: line 3:54: mismatched input 'as'. Expecting: '%', '*', '+', ',', '-', '.', '/', 'AND', 'AT', 'CROSS', 'EXCEPT', 'FETCH', 'FULL', 'GROUP', 'HAVING', 'INNER', 'INTERSECT', 'JOIN', 'LEFT', 'LIMIT', 'NATURAL', 'OFFSET', 'OR', 'ORDER', 'RIGHT', 'UNION', 'WHERE', 'WINDOW', '[', '||', <EOF>, <predicate>
select p.id, f.name as family_name, p.first_name, p.last_name, p.age
from minio.bucket.people as p
inner join mysql.practice.family as f on p.family_id as integer = f.id
order by family_name, cast(p.age as integer) desc
このあたりは、ゆるっとはいかないようですね。
あと、せっかくなのでexplain
も行ってみましょう。
先に統計情報を見ておきます。
SHOW STATS — Trino 395 Documentation
trino> show stats for minio.bucket.people;
column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
family_id | NULL | NULL | NULL | NULL | NULL | NULL
id | NULL | NULL | NULL | NULL | NULL | NULL
first_name | NULL | NULL | NULL | NULL | NULL | NULL
last_name | NULL | NULL | NULL | NULL | NULL | NULL
age | NULL | NULL | NULL | NULL | NULL | NULL
NULL | NULL | NULL | NULL | NULL | NULL | NULL
(6 rows)
Query 20220913_135248_00007_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.27 [0 rows, 0B] [0 rows/s, 0B/s]
trino> show stats for mysql.practice.family;
column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
id | NULL | 2.0 | 0.0 | NULL | NULL | NULL
name | NULL | NULL | NULL | NULL | NULL | NULL
NULL | NULL | NULL | NULL | 3.0 | NULL | NULL
(3 rows)
Query 20220913_135305_00008_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.26 [0 rows, 0B] [0 rows/s, 0B/s]
MinIO側の統計情報は全部null
ですね。
analyze
してみます。
ANALYZE — Trino 395 Documentation
trino> analyze minio.bucket.people;
ANALYZE: 14 rows
Query 20220913_135411_00009_68bq6, FINISHED, 1 node
Splits: 13 total, 13 done (100.00%)
1.15 [14 rows, 450B] [12 rows/s, 392B/s]
trino> analyze mysql.practice.family;
Query 20220913_135421_00010_68bq6 failed: This connector does not support analyze
MySQL connectorはanalyze
をサポートしていないと言われました。
再度統計情報を確認。
trino> show stats for minio.bucket.people;
column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
family_id | 14.0 | 3.0 | 0.0 | NULL | NULL | NULL
id | 14.0 | 7.0 | 0.0 | NULL | NULL | NULL
first_name | 111.0 | 14.0 | 0.0 | NULL | NULL | NULL
last_name | 102.0 | 5.0 | 0.0 | NULL | NULL | NULL
age | 25.0 | 13.0 | 0.0 | NULL | NULL | NULL
NULL | NULL | NULL | NULL | 14.0 | NULL | NULL
(6 rows)
Query 20220913_135513_00011_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.24 [0 rows, 0B] [0 rows/s, 0B/s]
trino> show stats for mysql.practice.family;
column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
id | NULL | 2.0 | 0.0 | NULL | NULL | NULL
name | NULL | NULL | NULL | NULL | NULL | NULL
NULL | NULL | NULL | NULL | 3.0 | NULL | NULL
(3 rows)
Query 20220913_135516_00012_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.25 [0 rows, 0B] [0 rows/s, 0B/s]
MinIO側にも、統計情報が入りました。
では、explain
してみます。
Cost in EXPLAIN — Trino 395 Documentation
結果。
trino> explain select p.id, f.name as family_name, p.first_name, p.last_name, p.age
-> from minio.bucket.people as p
-> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
-> order by family_name, cast(p.age as integer) desc;
Query Plan
Fragment 0 [SINGLE]
Output layout: [id, name, first_name, last_name, age]
Output partitioning: SINGLE []
Output[columnNames = [id, family_name, first_name, last_name, age]]
│ Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
│ family_name := name
└─ Project[]
│ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
└─ RemoteMerge[sourceFragmentIds = [1]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
Fragment 1 [ROUND_ROBIN]
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: SINGLE []
LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
└─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
Fragment 2 [SOURCE]
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: ROUND_ROBIN []
Project[]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
│ expr_1 := CAST("age" AS integer)
└─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
│ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
│ Distribution: REPLICATED
├─ Project[]
│ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
│ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
│ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
│ └─ ScanProject[table = minio:bucket:people]
│ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
│ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
│ expr := CAST("family_id" AS integer)
│ family_id := family_id:string:REGULAR
│ last_name := last_name:string:REGULAR
│ id := id:string:REGULAR
│ first_name := first_name:string:REGULAR
│ age := age:string:REGULAR
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
│ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
Estimates:
Fragment 3 [SOURCE]
Output layout: [id_0, name, $hashvalue_4]
Output partitioning: BROADCAST []
ScanProject[table = mysql:practice.family practice.family]
Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
$hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
name := name:varchar(20):VARCHAR
id_0 := id:integer:INT
(1 row)
Query 20220913_135715_00013_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.78 [0 rows, 0B] [0 rows/s, 0B/s]
explain
は、以下のことが可能なようです。
デフォルトは分散実行計画(DISTRIBUTED)をテキスト形式で表示します。
実行例を変えてみましょう。論理実行計画(LOGICAL)をテキスト形式で表示すると、こんな感じになりました。
trino> explain(type logical, format text) select p.id, f.name as family_name, p.first_name, p.last_name, p.age
-> from minio.bucket.people as p
-> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
-> order by family_name, cast(p.age as integer) desc;
Query Plan
Output[columnNames = [id, family_name, first_name, last_name, age]]
│ Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
│ family_name := name
└─ Project[]
│ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
└─ RemoteMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
└─ LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
└─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
└─ RemoteExchange[type = REPARTITION]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: 7.14k, memory: 207B, network: 1.54kB}
└─ Project[]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
│ expr_1 := CAST("age" AS integer)
└─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
│ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
│ Distribution: REPLICATED
├─ Project[]
│ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
│ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
│ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
│ └─ ScanProject[table = minio:bucket:people]
│ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
│ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
│ expr := CAST("family_id" AS integer)
│ family_id := family_id:string:REGULAR
│ last_name := last_name:string:REGULAR
│ id := id:string:REGULAR
│ first_name := first_name:string:REGULAR
│ age := age:string:REGULAR
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
│ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
└─ RemoteExchange[type = REPLICATE]
│ Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
│ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
└─ ScanProject[table = mysql:practice.family practice.family]
Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
$hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
name := name:varchar(20):VARCHAR
id_0 := id:integer:INT
(1 row)
Query 20220913_140407_00015_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.34 [0 rows, 0B] [0 rows/s, 0B/s]
explain
の説明には、分散実行計画の読み方が書かれているのでこちらを見てみましょう。
分散プランの各プランフラグメントは、ひとつまたは複数ノードでのTrinoノードで実行されることを表しています。
Each plan fragment of the distributed plan is executed by a single or multiple Trino nodes.
フラグメントが分かれているということは、Trinoノード間でデータの交換が行われることを表しています。
Fragments separation represent the data exchange between Trino nodes.
フラグメントの種類は、フラグメントがTrinoノードによってどのように実行されるか、そしてどのようにフラグメント間でデータが分散されるかを
示しています。
Fragment type specifies how the fragment is executed by Trino nodes and how the data is distributed between fragments:
フラグメントの種類は、次の4つがあります。
SINGLE
… フラグメントは単一のノードで実行される
HASH
… フラグメントは、ハッシュ関数を使用して分散した入力データを使用し、固定数のノードで実行される
ROUND_ROBIN
… フラグメントは、入力データをすべてのノードにブロードキャストし、固定数のノードで実行される
SOURCE
… フラグメントは、分割入力にアクセスできるノードで実行される
ここで、最初に実行した分散実行計画を見返してみます。
Fragment 0 [SINGLE]
Output layout: [id, name, first_name, last_name, age]
Output partitioning: SINGLE []
Output[columnNames = [id, family_name, first_name, last_name, age]]
│ Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
│ family_name := name
└─ Project[]
│ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
└─ RemoteMerge[sourceFragmentIds = [1]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
Fragment 1 [ROUND_ROBIN]
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: SINGLE []
LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
└─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
Fragment 2 [SOURCE]
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: ROUND_ROBIN []
Project[]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
│ expr_1 := CAST("age" AS integer)
└─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
│ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
│ Distribution: REPLICATED
├─ Project[]
│ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
│ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
│ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
│ └─ ScanProject[table = minio:bucket:people]
│ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
│ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
│ expr := CAST("family_id" AS integer)
│ family_id := family_id:string:REGULAR
│ last_name := last_name:string:REGULAR
│ id := id:string:REGULAR
│ first_name := first_name:string:REGULAR
│ age := age:string:REGULAR
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
│ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
Estimates:
Fragment 3 [SOURCE]
Output layout: [id_0, name, $hashvalue_4]
Output partitioning: BROADCAST []
ScanProject[table = mysql:practice.family practice.family]
Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
$hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
name := name:varchar(20):VARCHAR
id_0 := id:integer:INT
フラグメントは4つあります。
Fragment 0 [SINGLE]
Fragment 1 [ROUND_ROBIN]
Fragment 2 [SOURCE]
Fragment 3 [SOURCE]
SOURCE
を見ると、入力となるテーブルの情報が現れます。
こちらはMySQL側。ScanProject
という項目を見ると、テーブル名が出ていますね。
Fragment 3 [SOURCE]
Output layout: [id_0, name, $hashvalue_4]
Output partitioning: BROADCAST []
ScanProject[table = mysql:practice.family practice.family]
Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
$hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
name := name:varchar(20):VARCHAR
id_0 := id:integer:INT
MinIOの方はちょっと奥ですが、こちらもScanProject
があります。
Fragment 2 [SOURCE]
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: ROUND_ROBIN []
Project[]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ 〜省略〜
│
└─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
│ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
│ 〜省略〜
│
├─ Project[]
│ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
│ │ 〜省略〜
│ │
│ └─ ScanProject[table = minio:bucket:people]
│ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
│ 〜省略〜
│
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
│ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
Estimates:
よく見ると、RemoteSource
とInnerJoin
していることも書かれていますね。
そして、このデータをROUND_ROGIN
(ブロードキャスト)してソート。
Fragment 1 [ROUND_ROBIN]
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: SINGLE []
LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
└─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
ここでは、RemoteSource
がFragment 2であることも書かれています。
最後に単一ノードにマージ、という感じですね。
Fragment 0 [SINGLE]
Output layout: [id, name, first_name, last_name, age]
Output partitioning: SINGLE []
Output[columnNames = [id, family_name, first_name, last_name, age]]
│ Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
│ family_name := name
└─ Project[]
│ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
└─ RemoteMerge[sourceFragmentIds = [1]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
フラグメントは、RemoteSource
を見ていくとわかるような気がします。
こう見た後に論理実行計画(LOGICAL)を見返すと、読めそうな感じがしてきますね。
explain analyze
を使うと、CPU時間やコストも確認できるようです。こちらは分散実行計画固定のようです。
EXPLAIN ANALYZE — Trino 395 Documentation
trino> explain analyze select p.id, f.name as family_name, p.first_name, p.last_name, p.age
-> from minio.bucket.people as p
-> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
-> order by family_name, cast(p.age as integer) desc;
Query Plan
Fragment 1 [SINGLE]
CPU: 3.81ms, Scheduled: 3.86ms, Blocked 542.91ms (Input: 98.34ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (728B)
Output layout: [id, first_name, last_name, age, name]
Output partitioning: SINGLE []
Project[]
│ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
│ CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B)
│ Input avg.: 3.50 rows, Input std.dev.: 173.21%
└─ LocalExchange[partitioning = ROUND_ROBIN]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
│ CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 334.00ms (34.15%), Output: 14 rows (798B)
│ Input avg.: 14.00 rows, Input std.dev.: 0.00%
└─ RemoteMerge[sourceFragmentIds = [2]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 98.00ms (10.02%), Output: 14 rows (798B)
Input avg.: 14.00 rows, Input std.dev.: 0.00%
Fragment 2 [ROUND_ROBIN]
CPU: 7.88ms, Scheduled: 9.80ms, Blocked 448.91ms (Input: 326.76ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (798B)
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: SINGLE []
LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
│ CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 100.00ms (10.22%), Output: 14 rows (798B)
│ Input avg.: 3.50 rows, Input std.dev.: 100.00%
└─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
│ CPU: 3.00ms (4.92%), Scheduled: 4.00ms (4.55%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B)
│ Input avg.: 3.50 rows, Input std.dev.: 100.00%
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 327.00ms (33.44%), Output: 14 rows (798B)
Input avg.: 3.50 rows, Input std.dev.: 100.00%
Fragment 3 [SOURCE]
CPU: 40.07ms, Scheduled: 58.75ms, Blocked 119.42ms (Input: 55.91ms, Output: 0.00ns), Input: 17 rows (534B); per task: avg.: 17.00 std.dev.: 0.00, Output: 14 rows (798B)
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: ROUND_ROBIN []
Project[]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
│ CPU: 3.00ms (4.92%), Scheduled: 4.00ms (4.55%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B)
│ Input avg.: 4.67 rows, Input std.dev.: 36.42%
│ expr_1 := CAST("age" AS integer)
└─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
│ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
│ CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 41.00ms (4.19%), Output: 14 rows (728B)
│ Left (probe) Input avg.: 4.67 rows, Input std.dev.: 36.42%
│ Right (build) Input avg.: 3.00 rows, Input std.dev.: 0.00%
│ Collisions avg.: 0.00 (0.00% est.), Collisions std.dev.: ?%
│ Distribution: REPLICATED
├─ Project[]
│ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
│ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
│ │ CPU: 2.00ms (3.28%), Scheduled: 2.00ms (2.27%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B)
│ │ Input avg.: 4.67 rows, Input std.dev.: 36.42%
│ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
│ └─ ScanProject[table = minio:bucket:people]
│ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
│ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
│ CPU: 30.00ms (49.18%), Scheduled: 47.00ms (53.41%), Blocked: 0.00ns (0.00%), Output: 14 rows (602B)
│ Input avg.: 4.67 rows, Input std.dev.: 36.42%
│ expr := CAST("family_id" AS integer)
│ family_id := family_id:string:REGULAR
│ last_name := last_name:string:REGULAR
│ id := id:string:REGULAR
│ first_name := first_name:string:REGULAR
│ age := age:string:REGULAR
│ Input: 14 rows (450B), Filtered: 0.00%
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
│ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
│ CPU: 0.00ns (0.00%), Scheduled: 1.00ms (1.14%), Blocked: 22.00ms (2.25%), Output: 3 rows (84B)
│ Input avg.: 0.75 rows, Input std.dev.: 173.21%
└─ RemoteSource[sourceFragmentIds = [4]]
Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
Estimates:
CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 56.00ms (5.73%), Output: 3 rows (84B)
Input avg.: 0.75 rows, Input std.dev.: 173.21%
Fragment 4 [SOURCE]
CPU: 18.60ms, Scheduled: 26.09ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 3 rows (0B); per task: avg.: 3.00 std.dev.: 0.00, Output: 3 rows (84B)
Output layout: [id_0, name, $hashvalue_4]
Output partitioning: BROADCAST []
ScanProject[table = mysql:practice.family practice.family]
Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
CPU: 18.00ms (29.51%), Scheduled: 25.00ms (28.41%), Blocked: 0.00ns (0.00%), Output: 3 rows (84B)
Input avg.: 3.00 rows, Input std.dev.: 0.00%
$hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
name := name:varchar(20):VARCHAR
id_0 := id:integer:INT
Input: 3 rows (0B), Filtered: 0.00%
(1 row)
Query 20220913_142608_00016_68bq6, FINISHED, 1 node
Splits: 24 total, 24 done (100.00%)
0.59 [20 rows, 606B] [34 rows/s, 1.01KB/s]
explain analyze verbose
を使うと、使用しているオペレーターによっては追加情報を出力してくれるようです。
trino> explain analyze verbose select p.id, f.name as family_name, p.first_name, p.last_name, p.age
-> from minio.bucket.people as p
-> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
-> order by family_name, cast(p.age as integer) desc;
Query Plan
Fragment 1 [SINGLE]
CPU: 3.29ms, Scheduled: 4.07ms, Blocked 336.69ms (Input: 66.94ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (728B)
Output layout: [id, first_name, last_name, age, name]
Output partitioning: SINGLE []
Project[]
│ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
│ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
│ CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B)
│ metrics:
│ 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=14.00, p90=14.00, p95=14.00, p99=14.00, min=0.00, max=14.00}
│ Input avg.: 3.50 rows, Input std.dev.: 173.21%
└─ LocalExchange[partitioning = ROUND_ROBIN]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
│ CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 270.00ms (32.93%), Output: 14 rows (798B)
│ metrics:
│ 'Input distribution' = {count=1.00, p01=14.00, p05=14.00, p10=14.00, p25=14.00, p50=14.00, p75=14.00, p90=14.00, p95=14.00, p99=14.00, min=14.00, max=14.00}
│ Input avg.: 14.00 rows, Input std.dev.: 0.00%
└─ RemoteMerge[sourceFragmentIds = [2]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 67.00ms (8.17%), Output: 14 rows (798B)
metrics:
'Input distribution' = {count=1.00, p01=14.00, p05=14.00, p10=14.00, p25=14.00, p50=14.00, p75=14.00, p90=14.00, p95=14.00, p99=14.00, min=14.00, max=14.00}
Input avg.: 14.00 rows, Input std.dev.: 0.00%
Fragment 2 [ROUND_ROBIN]
CPU: 5.67ms, Scheduled: 7.79ms, Blocked 270.43ms (Input: 213.22ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (798B)
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: SINGLE []
LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
│ CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 57.00ms (6.95%), Output: 14 rows (798B)
│ metrics:
│ 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=3.00, p75=11.00, p90=11.00, p95=11.00, p99=11.00, min=0.00, max=11.00}
│ Input avg.: 3.50 rows, Input std.dev.: 128.57%
└─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
│ CPU: 2.00ms (3.77%), Scheduled: 3.00ms (3.90%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B)
│ metrics:
│ 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=3.00, p75=11.00, p90=11.00, p95=11.00, p99=11.00, min=0.00, max=11.00}
│ Input avg.: 3.50 rows, Input std.dev.: 128.57%
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
Estimates:
CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 213.00ms (25.98%), Output: 14 rows (798B)
metrics:
'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=3.00, p75=11.00, p90=11.00, p95=11.00, p99=11.00, min=0.00, max=11.00}
Input avg.: 3.50 rows, Input std.dev.: 128.57%
Fragment 3 [SOURCE]
CPU: 37.00ms, Scheduled: 57.84ms, Blocked 255.04ms (Input: 152.00ms, Output: 0.00ns), Input: 17 rows (534B); per task: avg.: 17.00 std.dev.: 0.00, Output: 14 rows (798B)
Output layout: [name, last_name, id, first_name, age, expr_1]
Output partitioning: ROUND_ROBIN []
Project[]
│ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
│ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
│ CPU: 3.00ms (5.66%), Scheduled: 3.00ms (3.90%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B)
│ metrics:
│ 'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00}
│ Input avg.: 4.67 rows, Input std.dev.: 36.42%
│ expr_1 := CAST("age" AS integer)
└─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
│ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
│ Reorder joins cost : {rows: 14 (1.27kB), cpu: 3.58k, memory: 180B, network: 180B}
│ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
│ CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 23.00ms (2.80%), Output: 14 rows (728B)
│ Left (probe) metrics:
│ 'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00}
│ Right (build) metrics:
│ 'Input distribution' = {count=1.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=3.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=3.00, max=3.00}
│ Left (probe) Input avg.: 4.67 rows, Input std.dev.: 36.42%
│ Right (build) Input avg.: 3.00 rows, Input std.dev.: 0.00%
│ Collisions avg.: 0.00 (0.00% est.), Collisions std.dev.: ?%
│ Distribution: REPLICATED
├─ Project[]
│ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
│ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
│ │ CPU: 2.00ms (3.77%), Scheduled: 2.00ms (2.60%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B)
│ │ metrics:
│ │ 'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00}
│ │ Input avg.: 4.67 rows, Input std.dev.: 36.42%
│ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
│ └─ ScanProject[table = minio:bucket:people]
│ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
│ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
│ CPU: 27.00ms (50.94%), Scheduled: 46.00ms (59.74%), Blocked: 0.00ns (0.00%), Output: 14 rows (602B)
│ metrics:
│ 'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00}
│ Input avg.: 4.67 rows, Input std.dev.: 36.42%
│ expr := CAST("family_id" AS integer)
│ family_id := family_id:string:REGULAR
│ last_name := last_name:string:REGULAR
│ id := id:string:REGULAR
│ first_name := first_name:string:REGULAR
│ age := age:string:REGULAR
│ Input: 14 rows (450B), Filtered: 0.00%
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
│ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
│ CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 38.00ms (4.63%), Output: 3 rows (84B)
│ metrics:
│ 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=0.00, max=3.00}
│ Input avg.: 0.75 rows, Input std.dev.: 173.21%
└─ RemoteSource[sourceFragmentIds = [4]]
Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
Estimates:
CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 152.00ms (18.54%), Output: 3 rows (84B)
metrics:
'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=0.00, max=3.00}
Input avg.: 0.75 rows, Input std.dev.: 173.21%
Fragment 4 [SOURCE]
CPU: 14.91ms, Scheduled: 19.48ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 3 rows (0B); per task: avg.: 3.00 std.dev.: 0.00, Output: 3 rows (84B)
Output layout: [id_0, name, $hashvalue_4]
Output partitioning: BROADCAST []
ScanProject[table = mysql:practice.family practice.family]
Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
CPU: 14.00ms (26.42%), Scheduled: 18.00ms (23.38%), Blocked: 0.00ns (0.00%), Output: 3 rows (84B)
connector metrics:
'Physical input read time' = {duration=1.00ms}
metrics:
'Input distribution' = {count=1.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=3.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=3.00, max=3.00}
Input avg.: 3.00 rows, Input std.dev.: 0.00%
$hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
name := name:varchar(20):VARCHAR
id_0 := id:integer:INT
Input: 3 rows (0B), Filtered: 0.00%
(1 row)
Query 20220913_143008_00023_68bq6, FINISHED, 1 node
Splits: 24 total, 24 done (100.00%)
0.51 [20 rows, 606B] [39 rows/s, 1.16KB/s]
joinの話は、コストベースの最適化に関するページにも書かれています。
Cost based optimizations — Trino 395 Documentation
クエリーでjoinが実行される順序は、パフォーマンスに大きな影響を与える可能性があります。処理されるデータ量、転送されるデータ量が
大きな要因になります。
このためコストベースのjoin戦略の場合、Trinoではテーブルの統計情報からjoinの順序のコストを見積もり、最もコストが低いjoinの順序を
選択します。
Cost based optimizations / Join enumeration
この挙動は、join_reordering_strategy
プロパティで3つの値から指定することになります。
また、Trinoではjoinにハッシュベースのjoinアルゴリズムを使用しますが、これはjoin演算子ごとに、入力ごとにハッシュテーブルを作成する
必要があることを意味します。これをビルド側と呼ぶようです。
もうひとつの入力では、各行ごとにハッシュテーブルをクエリーします。こちらをプローブ側と呼びます。
joinの際には、データの一部からハッシュテーブルを構築するPartitionedと、全データからハッシュテーブルを作成して各ノードに複製する
Broadcastの2つの戦略があります。
Cost based optimizations / Join distribution selection
この挙動は、join_distribution_type
プロパティで3つの値から指定することになります。
いずれの戦略にもトレードオフがあり、デフォルトではコストベースの選択で自動的にPartitionedかBroadcastかのいずれかのjoinを選択します。
つまり、いかにしてデータを絞り込んでアクセスするかが重要になりそうな感じですね。
あとはプッシュダウンという概念もあるようですが、こちらはまたいずれ。
Pushdown — Trino 395 Documentation
まとめ
Trinoを使って、MinIOとMySQLのデータをjoinしてアクセスしてみました。
これ自体は非常に簡単にいきました。
合わせて、explain
やanalyze
、joinの話を見ることになったのでいろいろと勉強になりましたね。個人的には、けっこう興味を持てました。