これは、なにをしたくて書いたもの?
前に、Trinoを使ってMySQLとMinIOに格納されたデータにアクセスしてみました。
分散SQLクエリーエンジン、TrinoをUbuntu Linux 20.04 LTSにインストールしてMySQLに接続してみる - CLOVER🍀
Trinoから、Hive connectorでAmazon S3互換のオブジェクトストレージMinIOにアクセスしてみる - CLOVER🍀
今回は、MinIOとMySQLに格納されたデータをjoinしてアクセスしてみたいと思います。
環境
Trinoは、395を使用します。表示しているものはCLIのバージョンですが、サーバー側も同じバージョンを使用します。
$ trino --version Trino CLI 395
Trinoサーバーは72.17.0.2で動作させますが、起動は後で行います。
Apache Hiveメタストアサービスは、3.0.0を使用します。また、172.17.0.3で動作させます。
Apache Hiveメタストアサービスに必要な、Apache Hadoopのバージョン。
$ /opt/hadoop/bin/hadoop version Hadoop 3.3.4 Source code repository https://github.com/apache/hadoop.git -r a585a73c3e02ac62350c136643a5e7f6095a3dbb Compiled by stevel on 2022-07-29T12:32Z Compiled with protoc 3.7.1 From source with checksum fb9dd8918a7b8a5b430d61af858f6ec This command was run using /opt/hadoop/share/hadoop/common/hadoop-common-3.3.4.jar
なお、TrinoもApache Hiveメタストアサービスも以下のJavaで動作させます。
$ java --version openjdk 17.0.4 2022-07-19 OpenJDK Runtime Environment Temurin-17.0.4+8 (build 17.0.4+8) OpenJDK 64-Bit Server VM Temurin-17.0.4+8 (build 17.0.4+8, mixed mode, sharing)
MinIO(Amazon S3)にアクセスするための、環境変数の設定はこちら。
$ export HADOOP_HOME=/opt/hadoop $ AWS_SDK_JAR=$(find ${HADOOP_HOME}/share/hadoop/tools/lib -name 'aws-java-sdk-bundle-*.jar') $ HADOOP_AWS_JAR=$(find ${HADOOP_HOME}/share/hadoop/tools/lib -name 'hadoop-aws-*.jar') $ export HADOOP_CLASSPATH=${AWS_SDK_JAR}:${HADOOP_AWS_JAR}
Apache Hiveメタストアサービスの起動は、後で行います。
MinIOのバージョンは、こちら。
$ minio --version minio version RELEASE.2022-09-07T22-25-02Z (commit-id=bb855499e1519f31c03c9b91c0f9f10cb6439253) Runtime: go1.18.6 linux/amd64 License: GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html> Copyright: 2015-2022 MinIO, Inc.
MinIOは以下のコマンドで起動し、172.17.0.4で動作しているものとします。
$ MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin minio server /var/lib/minio/data --console-address :9001
$ aws --version aws-cli/2.7.31 Python/3.9.11 Linux/5.4.0-125-generic exe/x86_64.ubuntu.20 prompt/off
クレデンシャルの設定。
$ export AWS_ACCESS_KEY_ID=minioadmin $ export AWS_SECRET_ACCESS_KEY=minioadmin $ export AWS_DEFAULT_REGION=ap-northeast-1
MySQLについてはこちら。172.17.0.5で動作しているものとし、接続情報はkazuhira
/password
で、practice
というデータベースを
作成済みとします。
$ mysql --version mysql Ver 8.0.30 for Linux on x86_64 (MySQL Community Server - GPL)
データの準備
まずはデータの準備をしましょう。
MinIOにはCSVファイルを置くことにします。お題はサザエさんで、1行目をヘッダーにしたCSVファイルを3つ用意します。
isono-family.csv
family_id,id,first_name,last_name,age 1,1,サザエ,フグ田,24 1,2,マスオ,フグ田,28 1,3,波平,磯野,54 1,4,フネ,磯野,50 1,5,カツオ,磯野,11 1,6,ワカメ,磯野,9 1,7,タラオ,フグ田,3
namino-family.csv
family_id,id,first_name,last_name,age 2,1,ノリスケ,波野,26 2,2,タイコ,波野,22 2,3,イクラ,波野,1
isasaka-family.csv
family_id,id,first_name,last_name,age 3,1,難物,伊佐坂,60 3,2,お軽,伊佐坂,50 3,3,甚六,伊佐坂,20 3,4,浮江,伊佐,16
MinIOのエンドポイントはこちら。
$ MINIO_ENDPOINT=http://172.17.0.4:9000
バケットを作成して
$ aws s3 mb --endpoint-url $MINIO_ENDPOINT s3://trino-bucket
sync
でアップロード。
$ aws s3 sync --endpoint-url $MINIO_ENDPOINT . s3://trino-bucket/files upload: ./isono-family.csv to s3://trino-bucket/files/isono-family.csv upload: ./isasaka-family.csv to s3://trino-bucket/files/isasaka-family.csv upload: ./namino-family.csv to s3://trino-bucket/files/namino-family.csv
確認。
$ aws s3 ls --endpoint-url $MINIO_ENDPOINT trino-bucket/files/ 2022-09-13 22:46:43 131 isasaka-family.csv 2022-09-13 22:46:43 207 isono-family.csv 2022-09-13 22:46:43 112 namino-family.csv
MySQL側にもテーブルを作成します。MinIOにアップロードしたファイルとjoinする想定のものです。
create table family( id integer, name varchar(20), primary key(id) );
データの登録。
insert into family(id, name) values(1, '磯野家'); insert into family(id, name) values(2, '波野家'); insert into family(id, name) values(3, '伊佐坂');
確認。
mysql> select * from family; +----+-----------+ | id | name | +----+-----------+ | 1 | 磯野家 | | 2 | 波野家 | | 3 | 伊佐坂 | +----+-----------+ 3 rows in set (0.00 sec)
これで、データの準備は完了です。
TrinoとApache Hiveメタストアサービスの準備
続いては、TrinoおよびApache Hiveメタストアサービスの準備を行います。
まずはApache Hiveメタストアサービスから行いましょう。MinIOにアクセスするため、以下のように設定。
conf/metastore-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- These are default values meant to allow easy smoke testing of the metastore. You will likely need to add a number of new values. --> <configuration> <property> <name>metastore.thrift.uris</name> <value>thrift://0.0.0.0:9083</value> <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description> </property> <property> <name>metastore.task.threads.always</name> <value>org.apache.hadoop.hive.metastore.events.EventCleanerTask</value> </property> <property> <name>metastore.expression.proxy</name> <value>org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy</value> </property> <property> <name>fs.s3a.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property> <name>fs.s3a.access.key</name> <value>minioadmin</value> </property> <property> <name>fs.s3a.secret.key</name> <value>minioadmin</value> </property> <property> <name>fs.s3a.endpoint</name> <value>http://172.17.0.4:9000</value> </property> <property> <name>fs.s3a.path.style.access</name> <value>true</value> </property> </configuration>
起動。
$ bin/schematool -initSchema -dbType derby $ bin/start-metastore
次に、Trinoの設定を行います。
Trino自体の設定。
etc/node.properties
node.environment=container node.id=340fae6b-55fe-486e-b122-d0fbe61d0ebb node.data-dir=/var/lib/trino-server/data
etc/jvm.config
-server -Xmx2G -XX:InitialRAMPercentage=80 -XX:MaxRAMPercentage=80 -XX:G1HeapRegionSize=32M -XX:+ExplicitGCInvokesConcurrent -XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -XX:-OmitStackTraceInFastThrow -XX:ReservedCodeCacheSize=512M -XX:PerMethodRecompilationCutoff=10000 -XX:PerBytecodeRecompilationCutoff=10000 -Djdk.attach.allowAttachSelf=true -Djdk.nio.maxCachedBufferSize=2000000 -XX:+UnlockDiagnosticVMOptions -XX:+UseAESCTRIntrinsics
etc/config.properties
coordinator=true node-scheduler.include-coordinator=true http-server.http.port=8080 discovery.uri=http://172.17.0.2:8080
続いて、Connectorの設定です。MinIOおよびMySQLにアクセスするための設定を行います。
MinIO。カタログ名はminio
としています。
etc/catalog/minio.properties
connector.name=hive hive.metastore.uri=thrift://172.17.0.3:9083 hive.storage-format=ORC hive.non-managed-table-writes-enabled=true hive.non-managed-table-creates-enabled=true hive.s3.aws-access-key=minioadmin hive.s3.aws-secret-key=minioadmin hive.s3.endpoint=http://172.17.0.4:9000 hive.s3.path-style-access=true #hive.s3select-pushdown.enabled=true
MySQL。カタログ名はmysql
としています。
etc/catalog/mysql.properties
connector.name=mysql connection-url=jdbc:mysql://172.17.0.5:3306 connection-user=kazuhira connection-password=password
これで、Trinoを起動。
$ bin/launcher run
TrinoからMinIOとMySQLにアクセスしてみる
準備が完了したので、TrinoからMinIOおよびMySQLにアクセスしてみましょう。
Trinoに接続。
$ trino --server 172.17.0.2:8080 trino>
まず、MinIOに向けてスキーマを作成します。
trino> create schema minio.bucket with(location = 's3a://trino-bucket/'); CREATE SCHEMA
MinIOのアップロードしたCSVファイルを参照するように、テーブルを作成。
create table minio.bucket.people ( family_id varchar, id varchar, first_name varchar, last_name varchar, age varchar ) with ( format = 'csv', csv_separator = ',', csv_quote = '"', csv_escape = '"', skip_header_line_count = 1, external_location = 's3a://trino-bucket/files' );
確認。
trino> select * from minio.bucket.people; family_id | id | first_name | last_name | age -----------+----+------------+-----------+----- 2 | 1 | ノリスケ | 波野 | 26 2 | 2 | タイコ | 波野 | 22 2 | 3 | イクラ | 波野 | 1 1 | 1 | サザエ | フグ田 | 24 1 | 2 | マスオ | フグ田 | 28 1 | 3 | 波平 | 磯野 | 54 1 | 4 | フネ | 磯野 | 50 1 | 5 | カツオ | 磯野 | 11 1 | 6 | ワカメ | 磯野 | 9 1 | 7 | タラオ | フグ田 | 3 3 | 1 | 難物 | 伊佐坂 | 60 3 | 2 | お軽 | 伊佐坂 | 50 3 | 3 | 甚六 | 伊佐坂 | 20 3 | 4 | 浮江 | 伊佐 | 16 (14 rows) Query 20220913_134818_00002_68bq6, FINISHED, 1 node Splits: 3 total, 3 done (100.00%) 1.79 [14 rows, 450B] [7 rows/s, 251B/s]
OKですね。
MySQL側は、あらかじめテーブルを作成しているため、Trinoからはすぐに認識できます。
trino> show tables from mysql.practice; Table -------- family (1 row) Query 20220913_134822_00003_68bq6, FINISHED, 1 node Splits: 7 total, 7 done (100.00%) 1.08 [1 rows, 24B] [0 rows/s, 22B/s]
テーブル定義や
trino> desc mysql.practice.family; Column | Type | Extra | Comment --------+-------------+-------+--------- id | integer | | name | varchar(20) | | (2 rows) Query 20220913_134852_00004_68bq6, FINISHED, 1 node Splits: 7 total, 7 done (100.00%) 0.61 [2 rows, 120B] [3 rows/s, 197B/s]
データの中身も確認できますね。
trino> select * from mysql.practice.family; id | name ----+-------- 1 | 磯野家 2 | 波野家 3 | 伊佐坂 (3 rows) Query 20220913_134903_00005_68bq6, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 0.43 [3 rows, 0B] [6 rows/s, 0B/s]
では、この2つのテーブルをjoinしてみましょう。
SELECT — Trino 395 Documentation
Trinoでは、以下のjoinの種類をサポートしているようです。
[ INNER ] JOIN LEFT [ OUTER ] JOIN RIGHT [ OUTER ] JOIN FULL [ OUTER ] JOIN CROSS JOIN
SQLを作成して
select p.id, f.name as family_name, p.first_name, p.last_name, p.age from minio.bucket.people as p inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id order by family_name, cast(p.age as integer) desc;
確認。
trino> select p.id, f.name as family_name, p.first_name, p.last_name, p.age -> from minio.bucket.people as p -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id -> order by family_name, cast(p.age as integer) desc; id | family_name | first_name | last_name | age ----+-------------+------------+-----------+----- 1 | 伊佐坂 | 難物 | 伊佐坂 | 60 2 | 伊佐坂 | お軽 | 伊佐坂 | 50 3 | 伊佐坂 | 甚六 | 伊佐坂 | 20 4 | 伊佐坂 | 浮江 | 伊佐 | 16 1 | 波野家 | ノリスケ | 波野 | 26 2 | 波野家 | タイコ | 波野 | 22 3 | 波野家 | イクラ | 波野 | 1 3 | 磯野家 | 波平 | 磯野 | 54 4 | 磯野家 | フネ | 磯野 | 50 2 | 磯野家 | マスオ | フグ田 | 28 1 | 磯野家 | サザエ | フグ田 | 24 5 | 磯野家 | カツオ | 磯野 | 11 6 | 磯野家 | ワカメ | 磯野 | 9 7 | 磯野家 | タラオ | フグ田 | 3 (14 rows) Query 20220913_134916_00006_68bq6, FINISHED, 1 node Splits: 15 total, 15 done (100.00%) 1.05 [20 rows, 606B] [19 rows/s, 578B/s]
すごくあっさり動きました。
今回CSVファイルからテーブルを作っているので、すべてのカラムが文字列になっています。このままだとMySQLにあるテーブルの列と
joinする際に困るのでキャストしていたのですが。
もしもキャストをやめた場合は、以下のように型が合わずにエラーになります。
trino> select p.id, f.name as family_name, p.first_name, p.last_name, p.age -> from minio.bucket.people as p -> inner join mysql.practice.family as f on p.family_id as integer = f.id -> order by family_name, cast(p.age as integer) desc; Query 20220912_162900_00017_kiyr9 failed: line 3:54: mismatched input 'as'. Expecting: '%', '*', '+', ',', '-', '.', '/', 'AND', 'AT', 'CROSS', 'EXCEPT', 'FETCH', 'FULL', 'GROUP', 'HAVING', 'INNER', 'INTERSECT', 'JOIN', 'LEFT', 'LIMIT', 'NATURAL', 'OFFSET', 'OR', 'ORDER', 'RIGHT', 'UNION', 'WHERE', 'WINDOW', '[', '||', <EOF>, <predicate> select p.id, f.name as family_name, p.first_name, p.last_name, p.age from minio.bucket.people as p inner join mysql.practice.family as f on p.family_id as integer = f.id order by family_name, cast(p.age as integer) desc
このあたりは、ゆるっとはいかないようですね。
あと、せっかくなのでexplain
も行ってみましょう。
先に統計情報を見ておきます。
SHOW STATS — Trino 395 Documentation
trino> show stats for minio.bucket.people; column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value -------------+-----------+-----------------------+----------------+-----------+-----------+------------ family_id | NULL | NULL | NULL | NULL | NULL | NULL id | NULL | NULL | NULL | NULL | NULL | NULL first_name | NULL | NULL | NULL | NULL | NULL | NULL last_name | NULL | NULL | NULL | NULL | NULL | NULL age | NULL | NULL | NULL | NULL | NULL | NULL NULL | NULL | NULL | NULL | NULL | NULL | NULL (6 rows) Query 20220913_135248_00007_68bq6, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 0.27 [0 rows, 0B] [0 rows/s, 0B/s] trino> show stats for mysql.practice.family; column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value -------------+-----------+-----------------------+----------------+-----------+-----------+------------ id | NULL | 2.0 | 0.0 | NULL | NULL | NULL name | NULL | NULL | NULL | NULL | NULL | NULL NULL | NULL | NULL | NULL | 3.0 | NULL | NULL (3 rows) Query 20220913_135305_00008_68bq6, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 0.26 [0 rows, 0B] [0 rows/s, 0B/s]
MinIO側の統計情報は全部null
ですね。
analyze
してみます。
ANALYZE — Trino 395 Documentation
trino> analyze minio.bucket.people; ANALYZE: 14 rows Query 20220913_135411_00009_68bq6, FINISHED, 1 node Splits: 13 total, 13 done (100.00%) 1.15 [14 rows, 450B] [12 rows/s, 392B/s] trino> analyze mysql.practice.family; Query 20220913_135421_00010_68bq6 failed: This connector does not support analyze
MySQL connectorはanalyze
をサポートしていないと言われました。
再度統計情報を確認。
trino> show stats for minio.bucket.people; column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value -------------+-----------+-----------------------+----------------+-----------+-----------+------------ family_id | 14.0 | 3.0 | 0.0 | NULL | NULL | NULL id | 14.0 | 7.0 | 0.0 | NULL | NULL | NULL first_name | 111.0 | 14.0 | 0.0 | NULL | NULL | NULL last_name | 102.0 | 5.0 | 0.0 | NULL | NULL | NULL age | 25.0 | 13.0 | 0.0 | NULL | NULL | NULL NULL | NULL | NULL | NULL | 14.0 | NULL | NULL (6 rows) Query 20220913_135513_00011_68bq6, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 0.24 [0 rows, 0B] [0 rows/s, 0B/s] trino> show stats for mysql.practice.family; column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value -------------+-----------+-----------------------+----------------+-----------+-----------+------------ id | NULL | 2.0 | 0.0 | NULL | NULL | NULL name | NULL | NULL | NULL | NULL | NULL | NULL NULL | NULL | NULL | NULL | 3.0 | NULL | NULL (3 rows) Query 20220913_135516_00012_68bq6, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 0.25 [0 rows, 0B] [0 rows/s, 0B/s]
MinIO側にも、統計情報が入りました。
では、explain
してみます。
Cost in EXPLAIN — Trino 395 Documentation
結果。
trino> explain select p.id, f.name as family_name, p.first_name, p.last_name, p.age -> from minio.bucket.people as p -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id -> order by family_name, cast(p.age as integer) desc; Query Plan ----------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [id, name, first_name, last_name, age] Output partitioning: SINGLE [] Output[columnNames = [id, family_name, first_name, last_name, age]] │ Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} │ family_name := name └─ Project[] │ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} └─ RemoteMerge[sourceFragmentIds = [1]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates: Fragment 1 [ROUND_ROBIN] Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: SINGLE [] LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} └─ RemoteSource[sourceFragmentIds = [2]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates: Fragment 2 [SOURCE] Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: ROUND_ROBIN [] Project[] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B} │ expr_1 := CAST("age" AS integer) └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED] │ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B} │ Distribution: REPLICATED ├─ Project[] │ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint] │ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B} │ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0)) │ └─ ScanProject[table = minio:bucket:people] │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar] │ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B} │ expr := CAST("family_id" AS integer) │ family_id := family_id:string:REGULAR │ last_name := last_name:string:REGULAR │ id := id:string:REGULAR │ first_name := first_name:string:REGULAR │ age := age:string:REGULAR └─ LocalExchange[partitioning = SINGLE] │ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint] │ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B} └─ RemoteSource[sourceFragmentIds = [3]] Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint] Estimates: Fragment 3 [SOURCE] Output layout: [id_0, name, $hashvalue_4] Output partitioning: BROADCAST [] ScanProject[table = mysql:practice.family practice.family] Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint] Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B} $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0)) name := name:varchar(20):VARCHAR id_0 := id:integer:INT (1 row) Query 20220913_135715_00013_68bq6, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 0.78 [0 rows, 0B] [0 rows/s, 0B/s]
explain
は、以下のことが可能なようです。
- ステートメントの論理実行計画(LOGICAL)、分散実行計画(DISTRIBUTED)を表示する
- ステートメントの検証を行う(VALIDATE)
- ステートメントの入出力を表示する(IO)
- 表示形式は、テキスト、Graphviz、JSONの3種類から選択可能
デフォルトは分散実行計画(DISTRIBUTED)をテキスト形式で表示します。
実行例を変えてみましょう。論理実行計画(LOGICAL)をテキスト形式で表示すると、こんな感じになりました。
trino> explain(type logical, format text) select p.id, f.name as family_name, p.first_name, p.last_name, p.age -> from minio.bucket.people as p -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id -> order by family_name, cast(p.age as integer) desc; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------- Output[columnNames = [id, family_name, first_name, last_name, age]] │ Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} │ family_name := name └─ Project[] │ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} └─ RemoteMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} └─ LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} └─ RemoteExchange[type = REPARTITION] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: 7.14k, memory: 207B, network: 1.54kB} └─ Project[] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B} │ expr_1 := CAST("age" AS integer) └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED] │ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B} │ Distribution: REPLICATED ├─ Project[] │ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint] │ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B} │ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0)) │ └─ ScanProject[table = minio:bucket:people] │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar] │ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B} │ expr := CAST("family_id" AS integer) │ family_id := family_id:string:REGULAR │ last_name := last_name:string:REGULAR │ id := id:string:REGULAR │ first_name := first_name:string:REGULAR │ age := age:string:REGULAR └─ LocalExchange[partitioning = SINGLE] │ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint] │ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B} └─ RemoteExchange[type = REPLICATE] │ Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint] │ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B} └─ ScanProject[table = mysql:practice.family practice.family] Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint] Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B} $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0)) name := name:varchar(20):VARCHAR id_0 := id:integer:INT (1 row) Query 20220913_140407_00015_68bq6, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 0.34 [0 rows, 0B] [0 rows/s, 0B/s]
explain
の説明には、分散実行計画の読み方が書かれているのでこちらを見てみましょう。
分散プランの各プランフラグメントは、ひとつまたは複数ノードでのTrinoノードで実行されることを表しています。
Each plan fragment of the distributed plan is executed by a single or multiple Trino nodes.
フラグメントが分かれているということは、Trinoノード間でデータの交換が行われることを表しています。
Fragments separation represent the data exchange between Trino nodes.
フラグメントの種類は、フラグメントがTrinoノードによってどのように実行されるか、そしてどのようにフラグメント間でデータが分散されるかを
示しています。
Fragment type specifies how the fragment is executed by Trino nodes and how the data is distributed between fragments:
フラグメントの種類は、次の4つがあります。
SINGLE
… フラグメントは単一のノードで実行されるHASH
… フラグメントは、ハッシュ関数を使用して分散した入力データを使用し、固定数のノードで実行されるROUND_ROBIN
… フラグメントは、入力データをすべてのノードにブロードキャストし、固定数のノードで実行されるSOURCE
… フラグメントは、分割入力にアクセスできるノードで実行される
ここで、最初に実行した分散実行計画を見返してみます。
Fragment 0 [SINGLE] Output layout: [id, name, first_name, last_name, age] Output partitioning: SINGLE [] Output[columnNames = [id, family_name, first_name, last_name, age]] │ Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} │ family_name := name └─ Project[] │ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} └─ RemoteMerge[sourceFragmentIds = [1]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates: Fragment 1 [ROUND_ROBIN] Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: SINGLE [] LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} └─ RemoteSource[sourceFragmentIds = [2]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates: Fragment 2 [SOURCE] Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: ROUND_ROBIN [] Project[] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B} │ expr_1 := CAST("age" AS integer) └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED] │ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B} │ Distribution: REPLICATED ├─ Project[] │ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint] │ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B} │ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0)) │ └─ ScanProject[table = minio:bucket:people] │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar] │ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B} │ expr := CAST("family_id" AS integer) │ family_id := family_id:string:REGULAR │ last_name := last_name:string:REGULAR │ id := id:string:REGULAR │ first_name := first_name:string:REGULAR │ age := age:string:REGULAR └─ LocalExchange[partitioning = SINGLE] │ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint] │ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B} └─ RemoteSource[sourceFragmentIds = [3]] Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint] Estimates: Fragment 3 [SOURCE] Output layout: [id_0, name, $hashvalue_4] Output partitioning: BROADCAST [] ScanProject[table = mysql:practice.family practice.family] Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint] Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B} $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0)) name := name:varchar(20):VARCHAR id_0 := id:integer:INT
フラグメントは4つあります。
Fragment 0 [SINGLE] Fragment 1 [ROUND_ROBIN] Fragment 2 [SOURCE] Fragment 3 [SOURCE]
SOURCE
を見ると、入力となるテーブルの情報が現れます。
こちらはMySQL側。ScanProject
という項目を見ると、テーブル名が出ていますね。
Fragment 3 [SOURCE] Output layout: [id_0, name, $hashvalue_4] Output partitioning: BROADCAST [] ScanProject[table = mysql:practice.family practice.family] Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint] Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B} $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0)) name := name:varchar(20):VARCHAR id_0 := id:integer:INT
MinIOの方はちょっと奥ですが、こちらもScanProject
があります。
Fragment 2 [SOURCE] Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: ROUND_ROBIN [] Project[] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ 〜省略〜 │ └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED] │ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)] │ 〜省略〜 │ ├─ Project[] │ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint] │ │ 〜省略〜 │ │ │ └─ ScanProject[table = minio:bucket:people] │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar] │ 〜省略〜 │ └─ LocalExchange[partitioning = SINGLE] │ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint] │ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B} └─ RemoteSource[sourceFragmentIds = [3]] Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint] Estimates:
よく見ると、RemoteSource
とInnerJoin
していることも書かれていますね。
そして、このデータをROUND_ROGIN
(ブロードキャスト)してソート。
Fragment 1 [ROUND_ROBIN] Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: SINGLE [] LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} └─ RemoteSource[sourceFragmentIds = [2]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates:
ここでは、RemoteSource
がFragment 2であることも書かれています。
最後に単一ノードにマージ、という感じですね。
Fragment 0 [SINGLE] Output layout: [id, name, first_name, last_name, age] Output partitioning: SINGLE [] Output[columnNames = [id, family_name, first_name, last_name, age]] │ Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} │ family_name := name └─ Project[] │ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} └─ RemoteMerge[sourceFragmentIds = [1]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates:
フラグメントは、RemoteSource
を見ていくとわかるような気がします。
こう見た後に論理実行計画(LOGICAL)を見返すと、読めそうな感じがしてきますね。
explain analyze
を使うと、CPU時間やコストも確認できるようです。こちらは分散実行計画固定のようです。
EXPLAIN ANALYZE — Trino 395 Documentation
trino> explain analyze select p.id, f.name as family_name, p.first_name, p.last_name, p.age -> from minio.bucket.people as p -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id -> order by family_name, cast(p.age as integer) desc; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ Fragment 1 [SINGLE] CPU: 3.81ms, Scheduled: 3.86ms, Blocked 542.91ms (Input: 98.34ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (728B) Output layout: [id, first_name, last_name, age, name] Output partitioning: SINGLE [] Project[] │ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} │ CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B) │ Input avg.: 3.50 rows, Input std.dev.: 173.21% └─ LocalExchange[partitioning = ROUND_ROBIN] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} │ CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 334.00ms (34.15%), Output: 14 rows (798B) │ Input avg.: 14.00 rows, Input std.dev.: 0.00% └─ RemoteMerge[sourceFragmentIds = [2]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates: CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 98.00ms (10.02%), Output: 14 rows (798B) Input avg.: 14.00 rows, Input std.dev.: 0.00% Fragment 2 [ROUND_ROBIN] CPU: 7.88ms, Scheduled: 9.80ms, Blocked 448.91ms (Input: 326.76ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (798B) Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: SINGLE [] LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} │ CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 100.00ms (10.22%), Output: 14 rows (798B) │ Input avg.: 3.50 rows, Input std.dev.: 100.00% └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} │ CPU: 3.00ms (4.92%), Scheduled: 4.00ms (4.55%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B) │ Input avg.: 3.50 rows, Input std.dev.: 100.00% └─ RemoteSource[sourceFragmentIds = [3]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates: CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 327.00ms (33.44%), Output: 14 rows (798B) Input avg.: 3.50 rows, Input std.dev.: 100.00% Fragment 3 [SOURCE] CPU: 40.07ms, Scheduled: 58.75ms, Blocked 119.42ms (Input: 55.91ms, Output: 0.00ns), Input: 17 rows (534B); per task: avg.: 17.00 std.dev.: 0.00, Output: 14 rows (798B) Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: ROUND_ROBIN [] Project[] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B} │ CPU: 3.00ms (4.92%), Scheduled: 4.00ms (4.55%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B) │ Input avg.: 4.67 rows, Input std.dev.: 36.42% │ expr_1 := CAST("age" AS integer) └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED] │ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B} │ CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 41.00ms (4.19%), Output: 14 rows (728B) │ Left (probe) Input avg.: 4.67 rows, Input std.dev.: 36.42% │ Right (build) Input avg.: 3.00 rows, Input std.dev.: 0.00% │ Collisions avg.: 0.00 (0.00% est.), Collisions std.dev.: ?% │ Distribution: REPLICATED ├─ Project[] │ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint] │ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B} │ │ CPU: 2.00ms (3.28%), Scheduled: 2.00ms (2.27%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B) │ │ Input avg.: 4.67 rows, Input std.dev.: 36.42% │ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0)) │ └─ ScanProject[table = minio:bucket:people] │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar] │ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B} │ CPU: 30.00ms (49.18%), Scheduled: 47.00ms (53.41%), Blocked: 0.00ns (0.00%), Output: 14 rows (602B) │ Input avg.: 4.67 rows, Input std.dev.: 36.42% │ expr := CAST("family_id" AS integer) │ family_id := family_id:string:REGULAR │ last_name := last_name:string:REGULAR │ id := id:string:REGULAR │ first_name := first_name:string:REGULAR │ age := age:string:REGULAR │ Input: 14 rows (450B), Filtered: 0.00% └─ LocalExchange[partitioning = SINGLE] │ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint] │ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B} │ CPU: 0.00ns (0.00%), Scheduled: 1.00ms (1.14%), Blocked: 22.00ms (2.25%), Output: 3 rows (84B) │ Input avg.: 0.75 rows, Input std.dev.: 173.21% └─ RemoteSource[sourceFragmentIds = [4]] Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint] Estimates: CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 56.00ms (5.73%), Output: 3 rows (84B) Input avg.: 0.75 rows, Input std.dev.: 173.21% Fragment 4 [SOURCE] CPU: 18.60ms, Scheduled: 26.09ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 3 rows (0B); per task: avg.: 3.00 std.dev.: 0.00, Output: 3 rows (84B) Output layout: [id_0, name, $hashvalue_4] Output partitioning: BROADCAST [] ScanProject[table = mysql:practice.family practice.family] Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint] Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B} CPU: 18.00ms (29.51%), Scheduled: 25.00ms (28.41%), Blocked: 0.00ns (0.00%), Output: 3 rows (84B) Input avg.: 3.00 rows, Input std.dev.: 0.00% $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0)) name := name:varchar(20):VARCHAR id_0 := id:integer:INT Input: 3 rows (0B), Filtered: 0.00% (1 row) Query 20220913_142608_00016_68bq6, FINISHED, 1 node Splits: 24 total, 24 done (100.00%) 0.59 [20 rows, 606B] [34 rows/s, 1.01KB/s]
explain analyze verbose
を使うと、使用しているオペレーターによっては追加情報を出力してくれるようです。
trino> explain analyze verbose select p.id, f.name as family_name, p.first_name, p.last_name, p.age -> from minio.bucket.people as p -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id -> order by family_name, cast(p.age as integer) desc; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 1 [SINGLE] CPU: 3.29ms, Scheduled: 4.07ms, Blocked 336.69ms (Input: 66.94ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (728B) Output layout: [id, first_name, last_name, age, name] Output partitioning: SINGLE [] Project[] │ Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)] │ Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?} │ CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B) │ metrics: │ 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=14.00, p90=14.00, p95=14.00, p99=14.00, min=0.00, max=14.00} │ Input avg.: 3.50 rows, Input std.dev.: 173.21% └─ LocalExchange[partitioning = ROUND_ROBIN] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} │ CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 270.00ms (32.93%), Output: 14 rows (798B) │ metrics: │ 'Input distribution' = {count=1.00, p01=14.00, p05=14.00, p10=14.00, p25=14.00, p50=14.00, p75=14.00, p90=14.00, p95=14.00, p99=14.00, min=14.00, max=14.00} │ Input avg.: 14.00 rows, Input std.dev.: 0.00% └─ RemoteMerge[sourceFragmentIds = [2]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates: CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 67.00ms (8.17%), Output: 14 rows (798B) metrics: 'Input distribution' = {count=1.00, p01=14.00, p05=14.00, p10=14.00, p25=14.00, p50=14.00, p75=14.00, p90=14.00, p95=14.00, p99=14.00, min=14.00, max=14.00} Input avg.: 14.00 rows, Input std.dev.: 0.00% Fragment 2 [ROUND_ROBIN] CPU: 5.67ms, Scheduled: 7.79ms, Blocked 270.43ms (Input: 213.22ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (798B) Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: SINGLE [] LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} │ CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 57.00ms (6.95%), Output: 14 rows (798B) │ metrics: │ 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=3.00, p75=11.00, p90=11.00, p95=11.00, p99=11.00, min=0.00, max=11.00} │ Input avg.: 3.50 rows, Input std.dev.: 128.57% └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?} │ CPU: 2.00ms (3.77%), Scheduled: 3.00ms (3.90%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B) │ metrics: │ 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=3.00, p75=11.00, p90=11.00, p95=11.00, p99=11.00, min=0.00, max=11.00} │ Input avg.: 3.50 rows, Input std.dev.: 128.57% └─ RemoteSource[sourceFragmentIds = [3]] Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] Estimates: CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 213.00ms (25.98%), Output: 14 rows (798B) metrics: 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=3.00, p75=11.00, p90=11.00, p95=11.00, p99=11.00, min=0.00, max=11.00} Input avg.: 3.50 rows, Input std.dev.: 128.57% Fragment 3 [SOURCE] CPU: 37.00ms, Scheduled: 57.84ms, Blocked 255.04ms (Input: 152.00ms, Output: 0.00ns), Input: 17 rows (534B); per task: avg.: 17.00 std.dev.: 0.00, Output: 14 rows (798B) Output layout: [name, last_name, id, first_name, age, expr_1] Output partitioning: ROUND_ROBIN [] Project[] │ Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer] │ Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B} │ CPU: 3.00ms (5.66%), Scheduled: 3.00ms (3.90%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B) │ metrics: │ 'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00} │ Input avg.: 4.67 rows, Input std.dev.: 36.42% │ expr_1 := CAST("age" AS integer) └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED] │ Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)] │ Reorder joins cost : {rows: 14 (1.27kB), cpu: 3.58k, memory: 180B, network: 180B} │ Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B} │ CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 23.00ms (2.80%), Output: 14 rows (728B) │ Left (probe) metrics: │ 'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00} │ Right (build) metrics: │ 'Input distribution' = {count=1.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=3.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=3.00, max=3.00} │ Left (probe) Input avg.: 4.67 rows, Input std.dev.: 36.42% │ Right (build) Input avg.: 3.00 rows, Input std.dev.: 0.00% │ Collisions avg.: 0.00 (0.00% est.), Collisions std.dev.: ?% │ Distribution: REPLICATED ├─ Project[] │ │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint] │ │ Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B} │ │ CPU: 2.00ms (3.77%), Scheduled: 2.00ms (2.60%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B) │ │ metrics: │ │ 'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00} │ │ Input avg.: 4.67 rows, Input std.dev.: 36.42% │ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0)) │ └─ ScanProject[table = minio:bucket:people] │ Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar] │ Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B} │ CPU: 27.00ms (50.94%), Scheduled: 46.00ms (59.74%), Blocked: 0.00ns (0.00%), Output: 14 rows (602B) │ metrics: │ 'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00} │ Input avg.: 4.67 rows, Input std.dev.: 36.42% │ expr := CAST("family_id" AS integer) │ family_id := family_id:string:REGULAR │ last_name := last_name:string:REGULAR │ id := id:string:REGULAR │ first_name := first_name:string:REGULAR │ age := age:string:REGULAR │ Input: 14 rows (450B), Filtered: 0.00% └─ LocalExchange[partitioning = SINGLE] │ Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint] │ Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B} │ CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 38.00ms (4.63%), Output: 3 rows (84B) │ metrics: │ 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=0.00, max=3.00} │ Input avg.: 0.75 rows, Input std.dev.: 173.21% └─ RemoteSource[sourceFragmentIds = [4]] Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint] Estimates: CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 152.00ms (18.54%), Output: 3 rows (84B) metrics: 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=0.00, max=3.00} Input avg.: 0.75 rows, Input std.dev.: 173.21% Fragment 4 [SOURCE] CPU: 14.91ms, Scheduled: 19.48ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 3 rows (0B); per task: avg.: 3.00 std.dev.: 0.00, Output: 3 rows (84B) Output layout: [id_0, name, $hashvalue_4] Output partitioning: BROADCAST [] ScanProject[table = mysql:practice.family practice.family] Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint] Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B} CPU: 14.00ms (26.42%), Scheduled: 18.00ms (23.38%), Blocked: 0.00ns (0.00%), Output: 3 rows (84B) connector metrics: 'Physical input read time' = {duration=1.00ms} metrics: 'Input distribution' = {count=1.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=3.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=3.00, max=3.00} Input avg.: 3.00 rows, Input std.dev.: 0.00% $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0)) name := name:varchar(20):VARCHAR id_0 := id:integer:INT Input: 3 rows (0B), Filtered: 0.00% (1 row) Query 20220913_143008_00023_68bq6, FINISHED, 1 node Splits: 24 total, 24 done (100.00%) 0.51 [20 rows, 606B] [39 rows/s, 1.16KB/s]
joinの話は、コストベースの最適化に関するページにも書かれています。
Cost based optimizations — Trino 395 Documentation
クエリーでjoinが実行される順序は、パフォーマンスに大きな影響を与える可能性があります。処理されるデータ量、転送されるデータ量が
大きな要因になります。
このためコストベースのjoin戦略の場合、Trinoではテーブルの統計情報からjoinの順序のコストを見積もり、最もコストが低いjoinの順序を
選択します。
Cost based optimizations / Join enumeration
この挙動は、join_reordering_strategy
プロパティで3つの値から指定することになります。
また、Trinoではjoinにハッシュベースのjoinアルゴリズムを使用しますが、これはjoin演算子ごとに、入力ごとにハッシュテーブルを作成する
必要があることを意味します。これをビルド側と呼ぶようです。
もうひとつの入力では、各行ごとにハッシュテーブルをクエリーします。こちらをプローブ側と呼びます。
joinの際には、データの一部からハッシュテーブルを構築するPartitionedと、全データからハッシュテーブルを作成して各ノードに複製する
Broadcastの2つの戦略があります。
Cost based optimizations / Join distribution selection
この挙動は、join_distribution_type
プロパティで3つの値から指定することになります。
いずれの戦略にもトレードオフがあり、デフォルトではコストベースの選択で自動的にPartitionedかBroadcastかのいずれかのjoinを選択します。
つまり、いかにしてデータを絞り込んでアクセスするかが重要になりそうな感じですね。
あとはプッシュダウンという概念もあるようですが、こちらはまたいずれ。
Pushdown — Trino 395 Documentation
まとめ
Trinoを使って、MinIOとMySQLのデータをjoinしてアクセスしてみました。
これ自体は非常に簡単にいきました。
合わせて、explain
やanalyze
、joinの話を見ることになったのでいろいろと勉強になりましたね。個人的には、けっこう興味を持てました。