CLOVER🍀

That was when it all began.

Spring Statemachineのタイマートリガーを使ってみる

これは、なにをしたくて書いたもの?

Spring Statemachineのトリガーには、イベントとタイマーの2種類があります。

これまでSpring Statemachineを使ってきた時にはイベントトリガーしか扱ってこなかったので、タイマートリガーを試してみたいなと思います。

トリガー

そもそもトリガーとは?というところですが、クラッシュコースに載っています。

A trigger begins a transition. Triggers can be driven by either events or timers.

Spring Statemachine / Appendices / Appendix B: State Machine Concepts / A State Machine Crash Course / Triggers

遷移を開始させるもので、イベントとトリガーの2種類があります。

イベントトリガーは、こんな感じでStateMachine#sendEventで送り込むことで起動します。

   stateMachine
        .sendEvent(Mono.just(MessageBuilder
            .withPayload("E1").build()))
        .subscribe();

イベントをステートマシンに送信するトリガーですね。

EventTrigger is the most useful trigger, because it lets you directly interact with a state machine by sending events to it.

Spring Statemachine / Using Spring Statemachine / Triggering Transitions / Using EventTrigger

イベントトリガーの定義は、以下のようにTransitionConfigurer#eventで設定します。

    @Override
    public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
            throws Exception {
        transitions
            .withExternal()
                .source(States.SI).target(States.S1).event(Events.E1)
                .and()
            .withExternal()
                .source(States.S1).target(States.S2).event(Events.E2);
    }

triggerとは出てこないので、あんまり「トリガー」という印象になりにくい感じがしますね。

タイマートリガーは、ユーザーの操作なしで自動的にトリガーが起動する必要がある場合に便利なものだそうです。

TimerTrigger is useful when something needs to be triggered automatically without any user interaction.

Spring Statemachine / Using Spring Statemachine / Triggering Transitions / Using TimerTrigger

タイマートリガーは2種類で、継続的に起動するものと1度だけ起動するものがあります。

それぞれ、TransitionConfigurer#timerおよびTransitionConfigurer#timerOnceで設定します。

       transitions
            .withExternal()
                .source("S1").target("S2").event("E1")
                .and()
            .withExternal()
                .source("S1").target("S3").event("E2")
                .and()
            .withInternal()
                .source("S2")
                .action(timerAction())
                .timer(1000)
                .and()
            .withInternal()
                .source("S3")
                .action(timerAction())
                .timerOnce(1000);

では、実際に使っていってみましょう。

環境

今回の環境はこちら。

$ java --version
openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.4, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-125-generic", arch: "amd64", family: "unix"

Spring Bootプロジェクトを作成する

それでは、Spring Bootプロジェクトを作成します。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=statemachine-timer-triger \
  -d groupId=org.littlewings \
  -d artifactId=statemachine-timer-triger \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.statemachine \
  -d baseDir=statemachine-timer-triger | tar zxvf -

Spring Bootが2.6.7なのは、ドキュメントに習っています。

Spring Statemachine / Getting started / Using Maven

プロジェクト内に移動。

$ cd statemachine-timer-triger

自動生成されたソースコードは削除しておきます。

$ rm src/main/java/org/littlewings/spring/statemachine/StatemachineTimerTrigerApplication.java src/test/java/org/littlewings/spring/statemachine/StatemachineTimerTrigerApplicationTests.java

Maven依存関係など。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter</artifactId>
                </dependency>

                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

spring-boot-starterspring-statemachine-starterに変更します。

 <dependencies>
        <dependency>
            <groupId>org.springframework.statemachine</groupId>
            <artifactId>spring-statemachine-starter</artifactId>
            <version>3.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

では、ソースコードを作成していきます。

ステートを定義したenum

src/main/java/org/littlewings/spring/statemachine/States.java

package org.littlewings.spring.statemachine;

public enum States {
    INITIAL_STATE,
    STATE1,
    STATE2,
    END_STATE
}

イベントを定義したenum

src/main/java/org/littlewings/spring/statemachine/Events.java

package org.littlewings.spring.statemachine;

public enum Events {
    EVENT1,
    EVENT2,
    EVENT3
}

ステートマシンの定義。

src/main/java/org/littlewings/spring/statemachine/StateMachineConfig.java

package org.littlewings.spring.statemachine;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.action.Action;
import org.springframework.statemachine.config.EnableStateMachine;
import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineConfigurationConfigurer;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
import org.springframework.statemachine.trigger.TimerTrigger;

@Configuration
@EnableStateMachine
public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<States, Events> {
    @Override
    public void configure(StateMachineConfigurationConfigurer<States, Events> config)
            throws Exception {
        config
                .withConfiguration()
                .autoStartup(true)
                .machineId("my-statemachine");
    }

    @Override
    public void configure(StateMachineStateConfigurer<States, Events> states)
            throws Exception {
        states
                .withStates()
                .initial(States.INITIAL_STATE)
                .state(States.STATE1)
                .state(States.STATE2)
                .end(States.END_STATE);
    }

    @Override
    public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
            throws Exception {
        transitions
                .withExternal()
                .source(States.INITIAL_STATE).target(States.STATE1)
                .timer(TimeUnit.SECONDS.toMillis(2L))
                .action(loggingAction())
                .and()
                .withExternal()
                .source(States.STATE1).target(States.STATE2)
                .timerOnce(TimeUnit.SECONDS.toMillis(4L))
                .action(loggingAction())
                .and()
                .withExternal()
                .source(States.STATE2).target(States.END_STATE)
                .timer(TimeUnit.SECONDS.toMillis(6L))
                .action(loggingAction());
    }

    @Bean
    public Action<States, Events> loggingAction() {
        return stateContext ->
                System.out.printf(
                        "[%s] state action, stage = %s, state = %s, is timer trigger = %b, event = %s%n",
                        LocalDateTime.now(),
                        stateContext.getStage(),
                        stateContext.getTarget().getId(),
                        stateContext.getTransition().getTrigger() instanceof TimerTrigger,
                        stateContext.getMessage() != null ? stateContext.getMessage().getPayload() : "[none]"
                );
    }
}

ステートマシンを使うクラス。

src/main/java/org/littlewings/spring/statemachine/Runner.java

package org.littlewings.spring.statemachine;

import java.util.concurrent.TimeUnit;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.statemachine.StateMachine;
import org.springframework.stereotype.Component;

@Component
public class Runner implements ApplicationRunner {
    StateMachine<States, Events> stateMachine;

    public Runner(StateMachine<States, Events> stateMachine) {
        this.stateMachine = stateMachine;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        TimeUnit.SECONDS.sleep(15L);

        System.out.printf("StateMachine complete? %b%n", stateMachine.isComplete());
    }
}

といっても、今回はなにもしませんが。アプリケーションがすぐに終了してしまわないようにスリープを入れているだけです。

mainメソッドを持ったクラス。

src/main/java/org/littlewings/spring/statemachine/App.java

package org.littlewings.spring.statemachine;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

アプリケーションを実行する前に、ステートマシンの設定を見返してみます。タイマートリガーの部分ですね。

TransitionConfigurer#timerTransitionConfigurer#timerOnceTransitionConfigurer#timerの順で設定しています。

    @Override
    public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
            throws Exception {
        transitions
                .withExternal()
                .source(States.INITIAL_STATE).target(States.STATE1)
                .timer(TimeUnit.SECONDS.toMillis(2L))
                .action(loggingAction())
                .and()
                .withExternal()
                .source(States.STATE1).target(States.STATE2)
                .timerOnce(TimeUnit.SECONDS.toMillis(4L))
                .action(loggingAction())
                .and()
                .withExternal()
                .source(States.STATE2).target(States.END_STATE)
                .timer(TimeUnit.SECONDS.toMillis(6L))
                .action(loggingAction());
    }

2秒後 → 4秒後 → 6秒後で起動するように仕掛けています。

では、起動してみましょう。

$ mvn spring-boot:run

結果。

2022-09-14 23:59:23.477  INFO 19904 --- [           main] org.littlewings.spring.statemachine.App  : Started App in 1.183 seconds (JVM running for 1.615)
[2022-09-14T23:59:25.342022008] state action, stage = TRANSITION, state = STATE1, is timer trigger = true, event = [none]
[2022-09-14T23:59:29.348587936] state action, stage = TRANSITION, state = STATE2, is timer trigger = true, event = [none]
[2022-09-14T23:59:35.335503356] state action, stage = TRANSITION, state = END_STATE, is timer trigger = true, event = [none]
StateMachine complete? true

イベントは発生せず、トリガーはTimerTriggerインスタンスで行われていることが確認できました。

ステートマシンもしっかり終了しています。

timeとtimerOnceの違い

ところで、timertimerOnceの違いが確認できていません。

ソースコードで確認することにします。

遷移のうちexternalの定義を見てみます。

   @Override
    public ExternalTransitionConfigurer<S, E> timer(long period) {
        setPeriod(period);
        return this;
    }

    @Override
    public ExternalTransitionConfigurer<S, E> timerOnce(long period) {
        setPeriod(period);
        setCount(1);
        return this;
    }

https://github.com/spring-projects/spring-statemachine/blob/v3.2.0/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/configurers/DefaultExternalTransitionConfigurer.java#L74-L85

両者の差は、countの指定有無になっています。

そして、TimerTriggerに以下のように反映されます。

   private void schedule() {
        long initialDelay = count > 0 ? period : 0;
        Flux<Long> interval = Flux.interval(Duration.ofMillis(initialDelay), Duration.ofMillis(period))
            .doOnNext(c -> {
                notifyTriggered();
            });
        if (count > 0) {
            interval = interval.take(count);
        }
        disposable = interval.subscribe();
    }

https://github.com/spring-projects/spring-statemachine/blob/v3.2.0/spring-statemachine-core/src/main/java/org/springframework/statemachine/trigger/TimerTrigger.java#L118-L128

というわけで、こんな感じのコードを書いてみました。

src/main/java/org/littlewings/spring/statemachine/TimerTriggerEmulator.java

package org.littlewings.spring.statemachine;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@Component
public class TimerTriggerEmulator implements ApplicationRunner {
    @Override
    public void run(ApplicationArguments args) throws Exception {
        long period = 3000L;

        // timerOnce
        System.out.printf("[%s] timer trigger once start.%n", LocalDateTime.now());

        Flux<Long> timerOnceTriger = Flux.interval(Duration.ofMillis(period), Duration.ofMillis(period))
                .doOnNext(c -> System.out.printf("[%s] timer trigger once execute.%n", LocalDateTime.now()));

        timerOnceTriger = timerOnceTriger.take(1);

        timerOnceTriger.subscribe();

        // timer
        System.out.printf("[%s] timer trigger start.%n", LocalDateTime.now());

        Flux<Long> timerTriger = Flux.interval(Duration.ofMillis(0), Duration.ofMillis(period))
                .doOnNext(c -> System.out.printf("[%s] timer trigger execute.%n", LocalDateTime.now()));

        timerTriger.subscribe();

        TimeUnit.SECONDS.sleep(15L);
    }
}

3秒おきに実行、timeOncetimeの順で似せて定義。

実行すると、以下のようにtimerOnceは1回、timerは継続で実行されます。

[2022-09-15T00:22:33.947264112] timer trigger once start.
[2022-09-15T00:22:34.000809450] timer trigger start.
[2022-09-15T00:22:34.001413086] timer trigger execute.
[2022-09-15T00:22:37.000953420] timer trigger once execute.
[2022-09-15T00:22:37.001310650] timer trigger execute.
[2022-09-15T00:22:40.001455757] timer trigger execute.
[2022-09-15T00:22:43.001376982] timer trigger execute.
[2022-09-15T00:22:46.001450416] timer trigger execute.
[2022-09-15T00:22:49.001499904] timer trigger execute.

なのですが、timerの方はすぐに起動してしまっているので、この処理以外になにか細工があるんでしょうね…。
まあ、そこまでは追わないことにします。

が、timerはどう使うと効果的なんでしょうか…?1回のタイマートリガーの起動で、ステートが遷移しない可能性がある場合、とかに
使うんでしょうか。エラー発生時の考慮とかもですかね。

まとめ

Spring Statemachineのタイマートリガーを試してみました。

遷移を起こすトリガーの種類を確認しようと思って見てみたのですが、あっさり使えましたね。

timertimerOnceの使い分けがちょっと気になりますが、他の要素を勉強していくとわかるかもしれません。

Trinoで、MinIOとMySQLのデータをjoinしてアクセスしてみる

これは、なにをしたくて書いたもの?

前に、Trinoを使ってMySQLとMinIOに格納されたデータにアクセスしてみました。

分散SQLクエリーエンジン、TrinoをUbuntu Linux 20.04 LTSにインストールしてMySQLに接続してみる - CLOVER🍀

Trinoから、Hive connectorでAmazon S3互換のオブジェクトストレージMinIOにアクセスしてみる - CLOVER🍀

今回は、MinIOとMySQLに格納されたデータをjoinしてアクセスしてみたいと思います。

環境

Trinoは、395を使用します。表示しているものはCLIのバージョンですが、サーバー側も同じバージョンを使用します。

$ trino --version
Trino CLI 395

Trinoサーバーは72.17.0.2で動作させますが、起動は後で行います。

Apache Hiveメタストアサービスは、3.0.0を使用します。また、172.17.0.3で動作させます。

Apache Hiveメタストアサービスに必要な、Apache Hadoopのバージョン。

$ /opt/hadoop/bin/hadoop version
Hadoop 3.3.4
Source code repository https://github.com/apache/hadoop.git -r a585a73c3e02ac62350c136643a5e7f6095a3dbb
Compiled by stevel on 2022-07-29T12:32Z
Compiled with protoc 3.7.1
From source with checksum fb9dd8918a7b8a5b430d61af858f6ec
This command was run using /opt/hadoop/share/hadoop/common/hadoop-common-3.3.4.jar

なお、TrinoもApache Hiveメタストアサービスも以下のJavaで動作させます。

$ java --version
openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment Temurin-17.0.4+8 (build 17.0.4+8)
OpenJDK 64-Bit Server VM Temurin-17.0.4+8 (build 17.0.4+8, mixed mode, sharing)

MinIO(Amazon S3)にアクセスするための、環境変数の設定はこちら。

$ export HADOOP_HOME=/opt/hadoop

$ AWS_SDK_JAR=$(find ${HADOOP_HOME}/share/hadoop/tools/lib -name 'aws-java-sdk-bundle-*.jar')
$ HADOOP_AWS_JAR=$(find ${HADOOP_HOME}/share/hadoop/tools/lib -name 'hadoop-aws-*.jar')

$ export HADOOP_CLASSPATH=${AWS_SDK_JAR}:${HADOOP_AWS_JAR}

Apache Hiveメタストアサービスの起動は、後で行います。

MinIOのバージョンは、こちら。

$ minio --version
minio version RELEASE.2022-09-07T22-25-02Z (commit-id=bb855499e1519f31c03c9b91c0f9f10cb6439253)
Runtime: go1.18.6 linux/amd64
License: GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html>
Copyright: 2015-2022 MinIO, Inc.

MinIOは以下のコマンドで起動し、172.17.0.4で動作しているものとします。

$ MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin minio server /var/lib/minio/data --console-address :9001

MinIO操作用のAWS CLIのバージョン。

$ aws --version
aws-cli/2.7.31 Python/3.9.11 Linux/5.4.0-125-generic exe/x86_64.ubuntu.20 prompt/off

クレデンシャルの設定。

$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin
$ export AWS_DEFAULT_REGION=ap-northeast-1

MySQLについてはこちら。172.17.0.5で動作しているものとし、接続情報はkazuhirapasswordで、practiceというデータベースを
作成済みとします。

$ mysql --version
mysql  Ver 8.0.30 for Linux on x86_64 (MySQL Community Server - GPL)

データの準備

まずはデータの準備をしましょう。

MinIOにはCSVファイルを置くことにします。お題はサザエさんで、1行目をヘッダーにしたCSVファイルを3つ用意します。

isono-family.csv

family_id,id,first_name,last_name,age
1,1,サザエ,フグ田,24
1,2,マスオ,フグ田,28
1,3,波平,磯野,54
1,4,フネ,磯野,50
1,5,カツオ,磯野,11
1,6,ワカメ,磯野,9
1,7,タラオ,フグ田,3

namino-family.csv

family_id,id,first_name,last_name,age
2,1,ノリスケ,波野,26
2,2,タイコ,波野,22
2,3,イクラ,波野,1

isasaka-family.csv

family_id,id,first_name,last_name,age
3,1,難物,伊佐坂,60
3,2,お軽,伊佐坂,50
3,3,甚六,伊佐坂,20
3,4,浮江,伊佐,16

MinIOのエンドポイントはこちら。

$ MINIO_ENDPOINT=http://172.17.0.4:9000

バケットを作成して

$ aws s3 mb --endpoint-url $MINIO_ENDPOINT s3://trino-bucket

syncでアップロード。

$ aws s3 sync --endpoint-url $MINIO_ENDPOINT . s3://trino-bucket/files
upload: ./isono-family.csv to s3://trino-bucket/files/isono-family.csv
upload: ./isasaka-family.csv to s3://trino-bucket/files/isasaka-family.csv
upload: ./namino-family.csv to s3://trino-bucket/files/namino-family.csv

確認。

$ aws s3 ls --endpoint-url $MINIO_ENDPOINT trino-bucket/files/
2022-09-13 22:46:43        131 isasaka-family.csv
2022-09-13 22:46:43        207 isono-family.csv
2022-09-13 22:46:43        112 namino-family.csv

MySQL側にもテーブルを作成します。MinIOにアップロードしたファイルとjoinする想定のものです。

create table family(
  id integer,
  name varchar(20),
  primary key(id)
);

データの登録。

insert into family(id, name) values(1, '磯野家');
insert into family(id, name) values(2, '波野家');
insert into family(id, name) values(3, '伊佐坂');

確認。

mysql> select * from family;
+----+-----------+
| id | name      |
+----+-----------+
|  1 | 磯野家    |
|  2 | 波野家    |
|  3 | 伊佐坂    |
+----+-----------+
3 rows in set (0.00 sec)

これで、データの準備は完了です。

TrinoとApache Hiveメタストアサービスの準備

続いては、TrinoおよびApache Hiveメタストアサービスの準備を行います。

まずはApache Hiveメタストアサービスから行いましょう。MinIOにアクセスするため、以下のように設定。

conf/metastore-site.xml

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
-->
<!-- These are default values meant to allow easy smoke testing of the metastore.  You will
likely need to add a number of new values. -->
<configuration>
  <property>
    <name>metastore.thrift.uris</name>
    <value>thrift://0.0.0.0:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
  </property>
  <property>
    <name>metastore.task.threads.always</name>
    <value>org.apache.hadoop.hive.metastore.events.EventCleanerTask</value>
  </property>
  <property>
    <name>metastore.expression.proxy</name>
    <value>org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy</value>
  </property>

  <property>
    <name>fs.s3a.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>
  <property>
    <name>fs.s3a.access.key</name>
    <value>minioadmin</value>
  </property>
  <property>
    <name>fs.s3a.secret.key</name>
    <value>minioadmin</value>
  </property>
  <property>
    <name>fs.s3a.endpoint</name>
    <value>http://172.17.0.4:9000</value>
  </property>
  <property>
    <name>fs.s3a.path.style.access</name>
    <value>true</value>
  </property>
</configuration>

起動。

$ bin/schematool -initSchema -dbType derby
$ bin/start-metastore

次に、Trinoの設定を行います。

Trino自体の設定。

etc/node.properties

node.environment=container
node.id=340fae6b-55fe-486e-b122-d0fbe61d0ebb
node.data-dir=/var/lib/trino-server/data

etc/jvm.config

-server
-Xmx2G
-XX:InitialRAMPercentage=80
-XX:MaxRAMPercentage=80
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=512M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
-XX:+UnlockDiagnosticVMOptions
-XX:+UseAESCTRIntrinsics

etc/config.properties

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery.uri=http://172.17.0.2:8080

続いて、Connectorの設定です。MinIOおよびMySQLにアクセスするための設定を行います。

MinIO。カタログ名はminioとしています。

etc/catalog/minio.properties

connector.name=hive
hive.metastore.uri=thrift://172.17.0.3:9083
hive.storage-format=ORC
hive.non-managed-table-writes-enabled=true
hive.non-managed-table-creates-enabled=true

hive.s3.aws-access-key=minioadmin
hive.s3.aws-secret-key=minioadmin
hive.s3.endpoint=http://172.17.0.4:9000
hive.s3.path-style-access=true
#hive.s3select-pushdown.enabled=true

MySQL。カタログ名はmysqlとしています。

etc/catalog/mysql.properties

connector.name=mysql
connection-url=jdbc:mysql://172.17.0.5:3306
connection-user=kazuhira
connection-password=password

これで、Trinoを起動。

$ bin/launcher run

TrinoからMinIOとMySQLにアクセスしてみる

準備が完了したので、TrinoからMinIOおよびMySQLにアクセスしてみましょう。

Trinoに接続。

$ trino --server 172.17.0.2:8080
trino>

まず、MinIOに向けてスキーマを作成します。

trino> create schema minio.bucket with(location = 's3a://trino-bucket/');
CREATE SCHEMA

MinIOのアップロードしたCSVファイルを参照するように、テーブルを作成。

create table minio.bucket.people (
  family_id varchar,
  id varchar,
  first_name varchar,
  last_name varchar,
  age varchar
) with (
  format = 'csv',
  csv_separator = ',',
  csv_quote = '"',
  csv_escape = '"',
  skip_header_line_count = 1,
  external_location = 's3a://trino-bucket/files'
);

確認。

trino> select * from minio.bucket.people;
 family_id | id | first_name | last_name | age
-----------+----+------------+-----------+-----
 2         | 1  | ノリスケ   | 波野      | 26
 2         | 2  | タイコ     | 波野      | 22
 2         | 3  | イクラ     | 波野      | 1
 1         | 1  | サザエ     | フグ田    | 24
 1         | 2  | マスオ     | フグ田    | 28
 1         | 3  | 波平       | 磯野      | 54
 1         | 4  | フネ       | 磯野      | 50
 1         | 5  | カツオ     | 磯野      | 11
 1         | 6  | ワカメ     | 磯野      | 9
 1         | 7  | タラオ     | フグ田    | 3
 3         | 1  | 難物       | 伊佐坂    | 60
 3         | 2  | お軽       | 伊佐坂    | 50
 3         | 3  | 甚六       | 伊佐坂    | 20
 3         | 4  | 浮江       | 伊佐      | 16
(14 rows)

Query 20220913_134818_00002_68bq6, FINISHED, 1 node
Splits: 3 total, 3 done (100.00%)
1.79 [14 rows, 450B] [7 rows/s, 251B/s]

OKですね。

MySQL側は、あらかじめテーブルを作成しているため、Trinoからはすぐに認識できます。

trino> show tables from mysql.practice;
 Table
--------
 family
(1 row)

Query 20220913_134822_00003_68bq6, FINISHED, 1 node
Splits: 7 total, 7 done (100.00%)
1.08 [1 rows, 24B] [0 rows/s, 22B/s]

テーブル定義や

trino> desc mysql.practice.family;
 Column |    Type     | Extra | Comment
--------+-------------+-------+---------
 id     | integer     |       |
 name   | varchar(20) |       |
(2 rows)

Query 20220913_134852_00004_68bq6, FINISHED, 1 node
Splits: 7 total, 7 done (100.00%)
0.61 [2 rows, 120B] [3 rows/s, 197B/s]

データの中身も確認できますね。

trino> select * from mysql.practice.family;
 id |  name
----+--------
  1 | 磯野家
  2 | 波野家
  3 | 伊佐坂
(3 rows)

Query 20220913_134903_00005_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.43 [3 rows, 0B] [6 rows/s, 0B/s]

では、この2つのテーブルをjoinしてみましょう。

SELECT — Trino 395 Documentation

Trinoでは、以下のjoinの種類をサポートしているようです。

[ INNER ] JOIN
LEFT [ OUTER ] JOIN
RIGHT [ OUTER ] JOIN
FULL [ OUTER ] JOIN
CROSS JOIN

SQLを作成して

select p.id, f.name as family_name, p.first_name, p.last_name, p.age
from minio.bucket.people as p
inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
order by family_name, cast(p.age as integer) desc;

確認。

trino> select p.id, f.name as family_name, p.first_name, p.last_name, p.age
    -> from minio.bucket.people as p
    -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
    -> order by family_name, cast(p.age as integer) desc;
 id | family_name | first_name | last_name | age
----+-------------+------------+-----------+-----
 1  | 伊佐坂      | 難物       | 伊佐坂    | 60
 2  | 伊佐坂      | お軽       | 伊佐坂    | 50
 3  | 伊佐坂      | 甚六       | 伊佐坂    | 20
 4  | 伊佐坂      | 浮江       | 伊佐      | 16
 1  | 波野家      | ノリスケ   | 波野      | 26
 2  | 波野家      | タイコ     | 波野      | 22
 3  | 波野家      | イクラ     | 波野      | 1
 3  | 磯野家      | 波平       | 磯野      | 54
 4  | 磯野家      | フネ       | 磯野      | 50
 2  | 磯野家      | マスオ     | フグ田    | 28
 1  | 磯野家      | サザエ     | フグ田    | 24
 5  | 磯野家      | カツオ     | 磯野      | 11
 6  | 磯野家      | ワカメ     | 磯野      | 9
 7  | 磯野家      | タラオ     | フグ田    | 3
(14 rows)

Query 20220913_134916_00006_68bq6, FINISHED, 1 node
Splits: 15 total, 15 done (100.00%)
1.05 [20 rows, 606B] [19 rows/s, 578B/s]

すごくあっさり動きました。

今回CSVファイルからテーブルを作っているので、すべてのカラムが文字列になっています。このままだとMySQLにあるテーブルの列と
joinする際に困るのでキャストしていたのですが。

もしもキャストをやめた場合は、以下のように型が合わずにエラーになります。

trino> select p.id, f.name as family_name, p.first_name, p.last_name, p.age
    -> from minio.bucket.people as p
    -> inner join mysql.practice.family as f on p.family_id as integer = f.id
    -> order by family_name, cast(p.age as integer) desc;
Query 20220912_162900_00017_kiyr9 failed: line 3:54: mismatched input 'as'. Expecting: '%', '*', '+', ',', '-', '.', '/', 'AND', 'AT', 'CROSS', 'EXCEPT', 'FETCH', 'FULL', 'GROUP', 'HAVING', 'INNER', 'INTERSECT', 'JOIN', 'LEFT', 'LIMIT', 'NATURAL', 'OFFSET', 'OR', 'ORDER', 'RIGHT', 'UNION', 'WHERE', 'WINDOW', '[', '||', <EOF>, <predicate>
select p.id, f.name as family_name, p.first_name, p.last_name, p.age
from minio.bucket.people as p
inner join mysql.practice.family as f on p.family_id as integer = f.id
order by family_name, cast(p.age as integer) desc

このあたりは、ゆるっとはいかないようですね。

あと、せっかくなのでexplainも行ってみましょう。

先に統計情報を見ておきます。

SHOW STATS — Trino 395 Documentation

trino> show stats for minio.bucket.people;
 column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
-------------+-----------+-----------------------+----------------+-----------+-----------+------------
 family_id   |      NULL |                  NULL |           NULL |      NULL | NULL      | NULL
 id          |      NULL |                  NULL |           NULL |      NULL | NULL      | NULL
 first_name  |      NULL |                  NULL |           NULL |      NULL | NULL      | NULL
 last_name   |      NULL |                  NULL |           NULL |      NULL | NULL      | NULL
 age         |      NULL |                  NULL |           NULL |      NULL | NULL      | NULL
 NULL        |      NULL |                  NULL |           NULL |      NULL | NULL      | NULL
(6 rows)

Query 20220913_135248_00007_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.27 [0 rows, 0B] [0 rows/s, 0B/s]

trino> show stats for mysql.practice.family;
 column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
-------------+-----------+-----------------------+----------------+-----------+-----------+------------
 id          |      NULL |                   2.0 |            0.0 |      NULL | NULL      | NULL
 name        |      NULL |                  NULL |           NULL |      NULL | NULL      | NULL
 NULL        |      NULL |                  NULL |           NULL |       3.0 | NULL      | NULL
(3 rows)

Query 20220913_135305_00008_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.26 [0 rows, 0B] [0 rows/s, 0B/s]

MinIO側の統計情報は全部nullですね。

analyzeしてみます。

ANALYZE — Trino 395 Documentation

trino> analyze minio.bucket.people;
ANALYZE: 14 rows

Query 20220913_135411_00009_68bq6, FINISHED, 1 node
Splits: 13 total, 13 done (100.00%)
1.15 [14 rows, 450B] [12 rows/s, 392B/s]

trino> analyze mysql.practice.family;
Query 20220913_135421_00010_68bq6 failed: This connector does not support analyze

MySQL connectorはanalyzeをサポートしていないと言われました。

再度統計情報を確認。

trino> show stats for minio.bucket.people;
 column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
-------------+-----------+-----------------------+----------------+-----------+-----------+------------
 family_id   |      14.0 |                   3.0 |            0.0 |      NULL | NULL      | NULL
 id          |      14.0 |                   7.0 |            0.0 |      NULL | NULL      | NULL
 first_name  |     111.0 |                  14.0 |            0.0 |      NULL | NULL      | NULL
 last_name   |     102.0 |                   5.0 |            0.0 |      NULL | NULL      | NULL
 age         |      25.0 |                  13.0 |            0.0 |      NULL | NULL      | NULL
 NULL        |      NULL |                  NULL |           NULL |      14.0 | NULL      | NULL
(6 rows)

Query 20220913_135513_00011_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.24 [0 rows, 0B] [0 rows/s, 0B/s]

trino> show stats for mysql.practice.family;
 column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
-------------+-----------+-----------------------+----------------+-----------+-----------+------------
 id          |      NULL |                   2.0 |            0.0 |      NULL | NULL      | NULL
 name        |      NULL |                  NULL |           NULL |      NULL | NULL      | NULL
 NULL        |      NULL |                  NULL |           NULL |       3.0 | NULL      | NULL
(3 rows)

Query 20220913_135516_00012_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.25 [0 rows, 0B] [0 rows/s, 0B/s]

MinIO側にも、統計情報が入りました。

では、explainしてみます。

Cost in EXPLAIN — Trino 395 Documentation

結果。

trino> explain select p.id, f.name as family_name, p.first_name, p.last_name, p.age
    -> from minio.bucket.people as p
    -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
    -> order by family_name, cast(p.age as integer) desc;
                                                               Query Plan
-----------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [id, name, first_name, last_name, age]
     Output partitioning: SINGLE []
     Output[columnNames = [id, family_name, first_name, last_name, age]]
     │   Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar]
     │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
     │   family_name := name
     └─ Project[]
        │   Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
        │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
        └─ RemoteMerge[sourceFragmentIds = [1]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:

 Fragment 1 [ROUND_ROBIN]
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: SINGLE []
     LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
     └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
        │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
        │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
        └─ RemoteSource[sourceFragmentIds = [2]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:

 Fragment 2 [SOURCE]
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: ROUND_ROBIN []
     Project[]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
     │   expr_1 := CAST("age" AS integer)
     └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
        │   Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
        │   Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
        │   Distribution: REPLICATED
        ├─ Project[]
        │  │   Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
        │  │   Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
        │  │   $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
        │  └─ ScanProject[table = minio:bucket:people]
        │         Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
        │         Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
        │         expr := CAST("family_id" AS integer)
        │         family_id := family_id:string:REGULAR
        │         last_name := last_name:string:REGULAR
        │         id := id:string:REGULAR
        │         first_name := first_name:string:REGULAR
        │         age := age:string:REGULAR
        └─ LocalExchange[partitioning = SINGLE]
           │   Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
           │   Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
           └─ RemoteSource[sourceFragmentIds = [3]]
                  Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
                  Estimates:

 Fragment 3 [SOURCE]
     Output layout: [id_0, name, $hashvalue_4]
     Output partitioning: BROADCAST []
     ScanProject[table = mysql:practice.family practice.family]
         Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
         Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
         $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
         name := name:varchar(20):VARCHAR
         id_0 := id:integer:INT


(1 row)

Query 20220913_135715_00013_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.78 [0 rows, 0B] [0 rows/s, 0B/s]

explainは、以下のことが可能なようです。

デフォルトは分散実行計画(DISTRIBUTED)をテキスト形式で表示します。

実行例を変えてみましょう。論理実行計画(LOGICAL)をテキスト形式で表示すると、こんな感じになりました。

trino> explain(type logical, format text) select p.id, f.name as family_name, p.first_name, p.last_name, p.age
    -> from minio.bucket.people as p
    -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
    -> order by family_name, cast(p.age as integer) desc;
                                                                      Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------
 Output[columnNames = [id, family_name, first_name, last_name, age]]
 │   Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar]
 │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
 │   family_name := name
 └─ Project[]
    │   Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
    │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
    └─ RemoteMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
       │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
       │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
       └─ LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
          │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
          │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
          └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
             │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
             │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
             └─ RemoteExchange[type = REPARTITION]
                │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
                │   Estimates: {rows: 14 (1.34kB), cpu: 7.14k, memory: 207B, network: 1.54kB}
                └─ Project[]
                   │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
                   │   Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
                   │   expr_1 := CAST("age" AS integer)
                   └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
                      │   Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
                      │   Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
                      │   Distribution: REPLICATED
                      ├─ Project[]
                      │  │   Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
                      │  │   Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
                      │  │   $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
                      │  └─ ScanProject[table = minio:bucket:people]
                      │         Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
                      │         Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
                      │         expr := CAST("family_id" AS integer)
                      │         family_id := family_id:string:REGULAR
                      │         last_name := last_name:string:REGULAR
                      │         id := id:string:REGULAR
                      │         first_name := first_name:string:REGULAR
                      │         age := age:string:REGULAR
                      └─ LocalExchange[partitioning = SINGLE]
                         │   Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
                         │   Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
                         └─ RemoteExchange[type = REPLICATE]
                            │   Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
                            │   Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
                            └─ ScanProject[table = mysql:practice.family practice.family]
                                   Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
                                   Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
                                   $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
                                   name := name:varchar(20):VARCHAR
                                   id_0 := id:integer:INT

(1 row)

Query 20220913_140407_00015_68bq6, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.34 [0 rows, 0B] [0 rows/s, 0B/s]

explainの説明には、分散実行計画の読み方が書かれているのでこちらを見てみましょう。

分散プランの各プランフラグメントは、ひとつまたは複数ノードでのTrinoノードで実行されることを表しています。

Each plan fragment of the distributed plan is executed by a single or multiple Trino nodes.

フラグメントが分かれているということは、Trinoノード間でデータの交換が行われることを表しています。

Fragments separation represent the data exchange between Trino nodes.

フラグメントの種類は、フラグメントがTrinoノードによってどのように実行されるか、そしてどのようにフラグメント間でデータが分散されるかを
示しています。

Fragment type specifies how the fragment is executed by Trino nodes and how the data is distributed between fragments:

フラグメントの種類は、次の4つがあります。

  • SINGLE … フラグメントは単一のノードで実行される
  • HASH … フラグメントは、ハッシュ関数を使用して分散した入力データを使用し、固定数のノードで実行される
  • ROUND_ROBIN … フラグメントは、入力データをすべてのノードにブロードキャストし、固定数のノードで実行される
  • SOURCE … フラグメントは、分割入力にアクセスできるノードで実行される

ここで、最初に実行した分散実行計画を見返してみます。

 Fragment 0 [SINGLE]
     Output layout: [id, name, first_name, last_name, age]
     Output partitioning: SINGLE []
     Output[columnNames = [id, family_name, first_name, last_name, age]]
     │   Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar]
     │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
     │   family_name := name
     └─ Project[]
        │   Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
        │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
        └─ RemoteMerge[sourceFragmentIds = [1]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:

 Fragment 1 [ROUND_ROBIN]
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: SINGLE []
     LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
     └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
        │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
        │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
        └─ RemoteSource[sourceFragmentIds = [2]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:

 Fragment 2 [SOURCE]
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: ROUND_ROBIN []
     Project[]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
     │   expr_1 := CAST("age" AS integer)
     └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
        │   Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
        │   Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
        │   Distribution: REPLICATED
        ├─ Project[]
        │  │   Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
        │  │   Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
        │  │   $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
        │  └─ ScanProject[table = minio:bucket:people]
        │         Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
        │         Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
        │         expr := CAST("family_id" AS integer)
        │         family_id := family_id:string:REGULAR
        │         last_name := last_name:string:REGULAR
        │         id := id:string:REGULAR
        │         first_name := first_name:string:REGULAR
        │         age := age:string:REGULAR
        └─ LocalExchange[partitioning = SINGLE]
           │   Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
           │   Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
           └─ RemoteSource[sourceFragmentIds = [3]]
                  Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
                  Estimates:

 Fragment 3 [SOURCE]
     Output layout: [id_0, name, $hashvalue_4]
     Output partitioning: BROADCAST []
     ScanProject[table = mysql:practice.family practice.family]
         Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
         Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
         $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
         name := name:varchar(20):VARCHAR
         id_0 := id:integer:INT

フラグメントは4つあります。

 Fragment 0 [SINGLE]

 Fragment 1 [ROUND_ROBIN]

 Fragment 2 [SOURCE]

 Fragment 3 [SOURCE]

SOURCEを見ると、入力となるテーブルの情報が現れます。

こちらはMySQL側。ScanProjectという項目を見ると、テーブル名が出ていますね。

 Fragment 3 [SOURCE]
     Output layout: [id_0, name, $hashvalue_4]
     Output partitioning: BROADCAST []
     ScanProject[table = mysql:practice.family practice.family]
         Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
         Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
         $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
         name := name:varchar(20):VARCHAR
         id_0 := id:integer:INT

MinIOの方はちょっと奥ですが、こちらもScanProjectがあります。

 Fragment 2 [SOURCE]
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: ROUND_ROBIN []
     Project[]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   〜省略〜
     │   
     └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
        │   Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
        │   〜省略〜
        │   
        ├─ Project[]
        │  │   Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
        │  │   〜省略〜
        │  │   
        │  └─ ScanProject[table = minio:bucket:people]
        │         Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
        │         〜省略〜
        │         
        └─ LocalExchange[partitioning = SINGLE]
           │   Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
           │   Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
           └─ RemoteSource[sourceFragmentIds = [3]]
                  Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
                  Estimates:

よく見ると、RemoteSourceInnerJoinしていることも書かれていますね。

そして、このデータをROUND_ROGIN(ブロードキャスト)してソート。

 Fragment 1 [ROUND_ROBIN]
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: SINGLE []
     LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
     └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
        │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
        │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
        └─ RemoteSource[sourceFragmentIds = [2]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:

ここでは、RemoteSourceがFragment 2であることも書かれています。

最後に単一ノードにマージ、という感じですね。

 Fragment 0 [SINGLE]
     Output layout: [id, name, first_name, last_name, age]
     Output partitioning: SINGLE []
     Output[columnNames = [id, family_name, first_name, last_name, age]]
     │   Layout: [id:varchar, name:varchar(20), first_name:varchar, last_name:varchar, age:varchar]
     │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
     │   family_name := name
     └─ Project[]
        │   Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
        │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
        └─ RemoteMerge[sourceFragmentIds = [1]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:

フラグメントは、RemoteSourceを見ていくとわかるような気がします。

こう見た後に論理実行計画(LOGICAL)を見返すと、読めそうな感じがしてきますね。

explain analyzeを使うと、CPU時間やコストも確認できるようです。こちらは分散実行計画固定のようです。

EXPLAIN ANALYZE — Trino 395 Documentation

trino> explain analyze select p.id, f.name as family_name, p.first_name, p.last_name, p.age
    -> from minio.bucket.people as p
    -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
    -> order by family_name, cast(p.age as integer) desc;
                                                                                  Query Plan
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 1 [SINGLE]
     CPU: 3.81ms, Scheduled: 3.86ms, Blocked 542.91ms (Input: 98.34ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (728B)
     Output layout: [id, first_name, last_name, age, name]
     Output partitioning: SINGLE []
     Project[]
     │   Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
     │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
     │   CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B)
     │   Input avg.: 3.50 rows, Input std.dev.: 173.21%
     └─ LocalExchange[partitioning = ROUND_ROBIN]
        │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
        │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
        │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 334.00ms (34.15%), Output: 14 rows (798B)
        │   Input avg.: 14.00 rows, Input std.dev.: 0.00%
        └─ RemoteMerge[sourceFragmentIds = [2]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:
               CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 98.00ms (10.02%), Output: 14 rows (798B)
               Input avg.: 14.00 rows, Input std.dev.: 0.00%

 Fragment 2 [ROUND_ROBIN]
     CPU: 7.88ms, Scheduled: 9.80ms, Blocked 448.91ms (Input: 326.76ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (798B)
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: SINGLE []
     LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
     │   CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 100.00ms (10.22%), Output: 14 rows (798B)
     │   Input avg.: 3.50 rows, Input std.dev.: 100.00%
     └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
        │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
        │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
        │   CPU: 3.00ms (4.92%), Scheduled: 4.00ms (4.55%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B)
        │   Input avg.: 3.50 rows, Input std.dev.: 100.00%
        └─ RemoteSource[sourceFragmentIds = [3]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:
               CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 327.00ms (33.44%), Output: 14 rows (798B)
               Input avg.: 3.50 rows, Input std.dev.: 100.00%

 Fragment 3 [SOURCE]
     CPU: 40.07ms, Scheduled: 58.75ms, Blocked 119.42ms (Input: 55.91ms, Output: 0.00ns), Input: 17 rows (534B); per task: avg.: 17.00 std.dev.: 0.00, Output: 14 rows (798B)
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: ROUND_ROBIN []
     Project[]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
     │   CPU: 3.00ms (4.92%), Scheduled: 4.00ms (4.55%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B)
     │   Input avg.: 4.67 rows, Input std.dev.: 36.42%
     │   expr_1 := CAST("age" AS integer)
     └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
        │   Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
        │   Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
        │   CPU: 1.00ms (1.64%), Scheduled: 1.00ms (1.14%), Blocked: 41.00ms (4.19%), Output: 14 rows (728B)
        │   Left (probe) Input avg.: 4.67 rows, Input std.dev.: 36.42%
        │   Right (build) Input avg.: 3.00 rows, Input std.dev.: 0.00%
        │   Collisions avg.: 0.00 (0.00% est.), Collisions std.dev.: ?%
        │   Distribution: REPLICATED
        ├─ Project[]
        │  │   Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
        │  │   Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
        │  │   CPU: 2.00ms (3.28%), Scheduled: 2.00ms (2.27%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B)
        │  │   Input avg.: 4.67 rows, Input std.dev.: 36.42%
        │  │   $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
        │  └─ ScanProject[table = minio:bucket:people]
        │         Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
        │         Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
        │         CPU: 30.00ms (49.18%), Scheduled: 47.00ms (53.41%), Blocked: 0.00ns (0.00%), Output: 14 rows (602B)
        │         Input avg.: 4.67 rows, Input std.dev.: 36.42%
        │         expr := CAST("family_id" AS integer)
        │         family_id := family_id:string:REGULAR
        │         last_name := last_name:string:REGULAR
        │         id := id:string:REGULAR
        │         first_name := first_name:string:REGULAR
        │         age := age:string:REGULAR
        │         Input: 14 rows (450B), Filtered: 0.00%
        └─ LocalExchange[partitioning = SINGLE]
           │   Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
           │   Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
           │   CPU: 0.00ns (0.00%), Scheduled: 1.00ms (1.14%), Blocked: 22.00ms (2.25%), Output: 3 rows (84B)
           │   Input avg.: 0.75 rows, Input std.dev.: 173.21%
           └─ RemoteSource[sourceFragmentIds = [4]]
                  Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
                  Estimates:
                  CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 56.00ms (5.73%), Output: 3 rows (84B)
                  Input avg.: 0.75 rows, Input std.dev.: 173.21%

 Fragment 4 [SOURCE]
     CPU: 18.60ms, Scheduled: 26.09ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 3 rows (0B); per task: avg.: 3.00 std.dev.: 0.00, Output: 3 rows (84B)
     Output layout: [id_0, name, $hashvalue_4]
     Output partitioning: BROADCAST []
     ScanProject[table = mysql:practice.family practice.family]
         Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
         Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
         CPU: 18.00ms (29.51%), Scheduled: 25.00ms (28.41%), Blocked: 0.00ns (0.00%), Output: 3 rows (84B)
         Input avg.: 3.00 rows, Input std.dev.: 0.00%
         $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
         name := name:varchar(20):VARCHAR
         id_0 := id:integer:INT
         Input: 3 rows (0B), Filtered: 0.00%


(1 row)

Query 20220913_142608_00016_68bq6, FINISHED, 1 node
Splits: 24 total, 24 done (100.00%)
0.59 [20 rows, 606B] [34 rows/s, 1.01KB/s]

explain analyze verboseを使うと、使用しているオペレーターによっては追加情報を出力してくれるようです。

trino> explain analyze verbose select p.id, f.name as family_name, p.first_name, p.last_name, p.age
    -> from minio.bucket.people as p
    -> inner join mysql.practice.family as f on cast(p.family_id as integer) = f.id
    -> order by family_name, cast(p.age as integer) desc;
                                                                                  Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 1 [SINGLE]
     CPU: 3.29ms, Scheduled: 4.07ms, Blocked 336.69ms (Input: 66.94ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (728B)
     Output layout: [id, first_name, last_name, age, name]
     Output partitioning: SINGLE []
     Project[]
     │   Layout: [id:varchar, first_name:varchar, last_name:varchar, age:varchar, name:varchar(20)]
     │   Estimates: {rows: 14 (1.27kB), cpu: ?, memory: ?, network: ?}
     │   CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B)
     │   metrics:
     │     'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=14.00, p90=14.00, p95=14.00, p99=14.00, min=0.00, max=14.00}
     │   Input avg.: 3.50 rows, Input std.dev.: 173.21%
     └─ LocalExchange[partitioning = ROUND_ROBIN]
        │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
        │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
        │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 270.00ms (32.93%), Output: 14 rows (798B)
        │   metrics:
        │     'Input distribution' = {count=1.00, p01=14.00, p05=14.00, p10=14.00, p25=14.00, p50=14.00, p75=14.00, p90=14.00, p95=14.00, p99=14.00, min=14.00, max=14.00}
        │   Input avg.: 14.00 rows, Input std.dev.: 0.00%
        └─ RemoteMerge[sourceFragmentIds = [2]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:
               CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 67.00ms (8.17%), Output: 14 rows (798B)
               metrics:
                 'Input distribution' = {count=1.00, p01=14.00, p05=14.00, p10=14.00, p25=14.00, p50=14.00, p75=14.00, p90=14.00, p95=14.00, p99=14.00, min=14.00, max=14.00}
               Input avg.: 14.00 rows, Input std.dev.: 0.00%

 Fragment 2 [ROUND_ROBIN]
     CPU: 5.67ms, Scheduled: 7.79ms, Blocked 270.43ms (Input: 213.22ms, Output: 0.00ns), Input: 14 rows (798B); per task: avg.: 14.00 std.dev.: 0.00, Output: 14 rows (798B)
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: SINGLE []
     LocalMerge[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
     │   CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 57.00ms (6.95%), Output: 14 rows (798B)
     │   metrics:
     │     'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=3.00, p75=11.00, p90=11.00, p95=11.00, p99=11.00, min=0.00, max=11.00}
     │   Input avg.: 3.50 rows, Input std.dev.: 128.57%
     └─ PartialSort[orderBy = [name ASC NULLS LAST, expr_1 DESC NULLS LAST]]
        │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
        │   Estimates: {rows: 14 (1.34kB), cpu: ?, memory: ?, network: ?}
        │   CPU: 2.00ms (3.77%), Scheduled: 3.00ms (3.90%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B)
        │   metrics:
        │     'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=3.00, p75=11.00, p90=11.00, p95=11.00, p99=11.00, min=0.00, max=11.00}
        │   Input avg.: 3.50 rows, Input std.dev.: 128.57%
        └─ RemoteSource[sourceFragmentIds = [3]]
               Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
               Estimates:
               CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 213.00ms (25.98%), Output: 14 rows (798B)
               metrics:
                 'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=3.00, p75=11.00, p90=11.00, p95=11.00, p99=11.00, min=0.00, max=11.00}
               Input avg.: 3.50 rows, Input std.dev.: 128.57%

 Fragment 3 [SOURCE]
     CPU: 37.00ms, Scheduled: 57.84ms, Blocked 255.04ms (Input: 152.00ms, Output: 0.00ns), Input: 17 rows (534B); per task: avg.: 17.00 std.dev.: 0.00, Output: 14 rows (798B)
     Output layout: [name, last_name, id, first_name, age, expr_1]
     Output partitioning: ROUND_ROBIN []
     Project[]
     │   Layout: [name:varchar(20), last_name:varchar, id:varchar, first_name:varchar, age:varchar, expr_1:integer]
     │   Estimates: {rows: 14 (1.34kB), cpu: 5.80k, memory: 207B, network: 207B}
     │   CPU: 3.00ms (5.66%), Scheduled: 3.00ms (3.90%), Blocked: 0.00ns (0.00%), Output: 14 rows (798B)
     │   metrics:
     │     'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00}
     │   Input avg.: 4.67 rows, Input std.dev.: 36.42%
     │   expr_1 := CAST("age" AS integer)
     └─ InnerJoin[criteria = ("expr" = "id_0"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
        │   Layout: [last_name:varchar, id:varchar, first_name:varchar, age:varchar, name:varchar(20)]
        │   Reorder joins cost : {rows: 14 (1.27kB), cpu: 3.58k, memory: 180B, network: 180B}
        │   Estimates: {rows: 14 (1.27kB), cpu: 4.46k, memory: 207B, network: 207B}
        │   CPU: 1.00ms (1.89%), Scheduled: 1.00ms (1.30%), Blocked: 23.00ms (2.80%), Output: 14 rows (728B)
        │   Left (probe) metrics:
        │     'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00}
        │   Right (build) metrics:
        │     'Input distribution' = {count=1.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=3.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=3.00, max=3.00}
        │   Left (probe) Input avg.: 4.67 rows, Input std.dev.: 36.42%
        │   Right (build) Input avg.: 3.00 rows, Input std.dev.: 0.00%
        │   Collisions avg.: 0.00 (0.00% est.), Collisions std.dev.: ?%
        │   Distribution: REPLICATED
        ├─ Project[]
        │  │   Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar, $hashvalue:bigint]
        │  │   Estimates: {rows: 14 (728B), cpu: 1.90k, memory: 0B, network: 0B}
        │  │   CPU: 2.00ms (3.77%), Scheduled: 2.00ms (2.60%), Blocked: 0.00ns (0.00%), Output: 14 rows (728B)
        │  │   metrics:
        │  │     'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00}
        │  │   Input avg.: 4.67 rows, Input std.dev.: 36.42%
        │  │   $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
        │  └─ ScanProject[table = minio:bucket:people]
        │         Layout: [last_name:varchar, expr:integer, id:varchar, first_name:varchar, age:varchar]
        │         Estimates: {rows: 14 (602B), cpu: 616, memory: 0B, network: 0B}/{rows: 14 (602B), cpu: 1.19k, memory: 0B, network: 0B}
        │         CPU: 27.00ms (50.94%), Scheduled: 46.00ms (59.74%), Blocked: 0.00ns (0.00%), Output: 14 rows (602B)
        │         metrics:
        │           'Input distribution' = {count=3.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=4.00, p75=7.00, p90=7.00, p95=7.00, p99=7.00, min=3.00, max=7.00}
        │         Input avg.: 4.67 rows, Input std.dev.: 36.42%
        │         expr := CAST("family_id" AS integer)
        │         family_id := family_id:string:REGULAR
        │         last_name := last_name:string:REGULAR
        │         id := id:string:REGULAR
        │         first_name := first_name:string:REGULAR
        │         age := age:string:REGULAR
        │         Input: 14 rows (450B), Filtered: 0.00%
        └─ LocalExchange[partitioning = SINGLE]
           │   Layout: [id_0:integer, name:varchar(20), $hashvalue_2:bigint]
           │   Estimates: {rows: 3 (207B), cpu: 387, memory: 0B, network: 207B}
           │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 38.00ms (4.63%), Output: 3 rows (84B)
           │   metrics:
           │     'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=0.00, max=3.00}
           │   Input avg.: 0.75 rows, Input std.dev.: 173.21%
           └─ RemoteSource[sourceFragmentIds = [4]]
                  Layout: [id_0:integer, name:varchar(20), $hashvalue_3:bigint]
                  Estimates:
                  CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 152.00ms (18.54%), Output: 3 rows (84B)
                  metrics:
                    'Input distribution' = {count=4.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=0.00, max=3.00}
                  Input avg.: 0.75 rows, Input std.dev.: 173.21%

 Fragment 4 [SOURCE]
     CPU: 14.91ms, Scheduled: 19.48ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 3 rows (0B); per task: avg.: 3.00 std.dev.: 0.00, Output: 3 rows (84B)
     Output layout: [id_0, name, $hashvalue_4]
     Output partitioning: BROADCAST []
     ScanProject[table = mysql:practice.family practice.family]
         Layout: [id_0:integer, name:varchar(20), $hashvalue_4:bigint]
         Estimates: {rows: 3 (207B), cpu: 180, memory: 0B, network: 0B}/{rows: 3 (207B), cpu: 387, memory: 0B, network: 0B}
         CPU: 14.00ms (26.42%), Scheduled: 18.00ms (23.38%), Blocked: 0.00ns (0.00%), Output: 3 rows (84B)
         connector metrics:
           'Physical input read time' = {duration=1.00ms}
         metrics:
           'Input distribution' = {count=1.00, p01=3.00, p05=3.00, p10=3.00, p25=3.00, p50=3.00, p75=3.00, p90=3.00, p95=3.00, p99=3.00, min=3.00, max=3.00}
         Input avg.: 3.00 rows, Input std.dev.: 0.00%
         $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
         name := name:varchar(20):VARCHAR
         id_0 := id:integer:INT
         Input: 3 rows (0B), Filtered: 0.00%


(1 row)

Query 20220913_143008_00023_68bq6, FINISHED, 1 node
Splits: 24 total, 24 done (100.00%)
0.51 [20 rows, 606B] [39 rows/s, 1.16KB/s]

joinの話は、コストベースの最適化に関するページにも書かれています。

Cost based optimizations — Trino 395 Documentation

クエリーでjoinが実行される順序は、パフォーマンスに大きな影響を与える可能性があります。処理されるデータ量、転送されるデータ量が
大きな要因になります。

このためコストベースのjoin戦略の場合、Trinoではテーブルの統計情報からjoinの順序のコストを見積もり、最もコストが低いjoinの順序を
選択します。

Cost based optimizations / Join enumeration

この挙動は、join_reordering_strategyプロパティで3つの値から指定することになります。

また、Trinoではjoinにハッシュベースのjoinアルゴリズムを使用しますが、これはjoin演算子ごとに、入力ごとにハッシュテーブルを作成する
必要があることを意味します。これをビルド側と呼ぶようです。

もうひとつの入力では、各行ごとにハッシュテーブルをクエリーします。こちらをプローブ側と呼びます。

joinの際には、データの一部からハッシュテーブルを構築するPartitionedと、全データからハッシュテーブルを作成して各ノードに複製する
Broadcastの2つの戦略があります。

Cost based optimizations / Join distribution selection

この挙動は、join_distribution_typeプロパティで3つの値から指定することになります。

いずれの戦略にもトレードオフがあり、デフォルトではコストベースの選択で自動的にPartitionedかBroadcastかのいずれかのjoinを選択します。

つまり、いかにしてデータを絞り込んでアクセスするかが重要になりそうな感じですね。

あとはプッシュダウンという概念もあるようですが、こちらはまたいずれ。

Pushdown — Trino 395 Documentation

まとめ

Trinoを使って、MinIOとMySQLのデータをjoinしてアクセスしてみました。

これ自体は非常に簡単にいきました。

合わせて、explainanalyze、joinの話を見ることになったのでいろいろと勉強になりましたね。個人的には、けっこう興味を持てました。