CLOVER🍀

That was when it all began.

MySQL 8.0に、LOAD DATA、Parallel Table Import Utility(MySQL Shell)、JDBCでCSVロードしてみる

これは、なにをしたくて書いたもの?

MySQLにCSVロードをしようとした時に、LOAD DATAを使ったり、ふつうにINSERT文を使ったりといくつか方法が
あると思うのですが、どれくらい違うものだろう?ということで試してみることにしました。

お題

CSVファイルを用意して、以下の方法を試してみます。

  • LOAD DATA(正確には、LOAD DATA LOCAL)
  • Parallel Table Import Utility(MySQL Shell)
  • CSVを読み込み、INSERT文を実行するJavaプログラム
    • シンプルにINSERTを実行
    • JDBCでのバッチ更新
    • rewriteBatchedStatementsをtrueにして、バッチ更新

データには、Stack Overflowのユーザー情報を4カラムのみ、50万件だけ切り出してCSVファイルにしたものを使います。

あまりデータ量自体は大したことがないのですが、そんなに気長に待てなかったからです…。

最初に、それぞれの方法について書いておきます。

LOAD DATA

LOAD DATAは、テキストファイルからMySQLのテーブルへデータをロードする文です。

SQL ステートメント / LOAD DATA ステートメント

通常のINSERT文よりも、20倍速いそうです。

テキストファイルからテーブルをロードする場合は、LOAD DATA を使用します。 通常、これは INSERT ステートメントを使用する場合より、20 倍速くなります。

最適化 / INSERT ステートメントの最適化

ただし、クライアント側にあるファイルをロードする場合(LOAD DATA LOCAL)は、セキュリティ上の注意点があり、
デフォルトでは無効になっています。この機能を使うためには、クライアント/サーバー両方で有効にする必要があります。

セキュリティー / LOAD DATA LOCAL のセキュリティー上の考慮事項

Parallel Table Import Utility

Parallel Table Import Utilityは、MySQL Shellの機能です。

MySQL :: MySQL Shell 8.0 :: 8.4 Parallel Table Import Utility

この機能は、入力となるテキストファイルを解析、チャンクに分割してMySQLにデータを並列にロードすることができます。

内部的にはLOAD DATAが並列で実行されるため、通常のLOAD DATAがシングルスレッドで動作するのに対して
大幅に高速化を見込むことができるようです。

並列度は、スレッド数の指定およびチャンクサイズから導出されます。

min{max{1, threads}, chunks}}

LOAD DATAよりも速くなるかどうかは、使用するデータファイルのサイズに依存するところもあるので、計測あるのみですね。

その他、入力の前処理として変換ができたり、複数のファイルを使ったインポートができたり、圧縮されたファイルを
ロードしたりもできるようです。

rewriteBatchedStatements

MySQLには、INSERT文のVALUES句に値を複数書き、1度のINSERT文で複数の行を登録する機能があります。
※バルクINSERT

最適化 / InnoDB テーブルの一括データロード

Connector/Jでは、rewriteBatchedStatementsというプロパティをtrueにするとJDBCのバッチ更新をバルクINSERTに
書き換えて実行してくれます。

MySQL :: MySQL Connector/J 8.0 Developer Guide :: 6.3.13 Performance Extensions

結果

先に結果だけ書いておくと、ふつうにINSERT文を実行するよりも、LOAD DATAを使ったりParallel Table Import Utilityを
使う方が圧倒的に速いです。

LOAD DATAとParallel Table Import Utilityについては、今回のデータ量ではほぼ差がありませんでした。
Parallel Table Import Utilityが、若干速いくらいです。扱ったデータ量が小さかったですね…。

JDBCでのINSERT文実行は、rewriteBatchedStatementsをtrueにしたバルクINSERTの方が、他の方法の倍以上速い結果に
なりました。

サマリは、こんな感じです。

50万行のCSVに対して。

  • Parallel Table Import Utility … 6秒程度
  • LOAD DATA … 7〜9秒程度
  • バルクINSERT … 30秒程度
  • 単純なINSERT、およびバッチ更新 … 80秒程度

バルクINSERTおよびバッチ更新のバッチサイズは、10にしています。

では、実際に作成したソースコードなどを記載していきます。

環境

今回の環境は、こちらです。

mysql> select version();
+-----------+
| version() |
+-----------+
| 8.0.23    |
+-----------+
1 row in set (0.01 sec)

MySQLサーバーは、172.17.0.2で動作しているものとします。チューニングは特にしておらず、ほぼデフォルト値です。

MySQL Shell。

$ mysqlsh --version
/path/to/mysqlsh   Ver 8.0.23 for Linux on x86_64 - for MySQL 8.0.23 (MySQL Community Server (GPL))

また、Javaに関する環境情報はこちら。

$ java --version
openjdk 11.0.10 2021-01-19
OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.20.04)
OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.10, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"

テーブル定義とデータ作成

DDLは、こんな感じで用意。

create table account (
  id int,
  name varchar(50),
  registered datetime,
  about varchar(255),
  primary key(id)
);

データはStack Overflowのユーザーを使いました。

$ curl -OL https://archive.org/download/stackexchange/stackoverflow.com-Users.7z
$ 7z x stackoverflow.com-Users.7z

$ ll -h Users.xml
-rw-rw-r-- 1 xxxxx xxxxx 4.4G  3月  1 13:31 Users.xml

$ wc -l Users.xml
14080582 Users.xml

中身がXMLなことと、ちょっと大きいのでデータ抽出と件数を絞り込むのにプログラムを作成してCSVに変換しました。

$ groovy users_to_csv.groovy Users.xml 500000

$ ll -h users_500000.csv
-rw-rw-r-- 1 xxxxx xxxxx 32M  4月 17 23:00 users_500000.csv

ここで使ったプログラムは、最後に載せます。users_500000.csvというのが、データロードに使うCSVファイルです。

ロードしたデータは、次のクエリの実行前にはTRUNCATEしているものとします。

mysql> truncate table account;
Query OK, 0 rows affected (0.21 sec)

では、進めていきましょう。

LOAD DATA

まずはLOAD DATAからやってみます。正確には、LOAD DATA LOCALですが。

SQL ステートメント / LOAD DATA ステートメント

mysqlコマンドを起動。

$ mysql -umysql_user -p -h172.17.0.2 practice
Enter password: 
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 22
Server version: 8.0.23 MySQL Community Server - GPL

Copyright (c) 2000, 2021, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> 

実行しようとすると、エラーになります。

mysql> load data local infile 'users_500000.csv' into table account fields terminated by ',' optionally enclosed by '"' escaped by '\\';
ERROR 3948 (42000): Loading local data is disabled; this must be enabled on both the client and server sides

エラーメッセージの通り、クライアントのローカルデータをクライアントとサーバーそれぞれで有効にする必要があります、と。

こちらのドキュメントに記載されている内容です。

セキュリティー / LOAD DATA LOCAL のセキュリティー上の考慮事項

サーバー側は、my.cnfのmysqldセクションでlocal-infileを設定するか、set global local_infileで設定します。

今回はset global local_infileで有効にすることにしました。

mysql> set global local_infile = on;
Query OK, 0 rows affected (0.00 sec)

再度実行…すると、エラーメッセージが変わります。

mysql> load data local infile 'users_500000.csv' into table account fields terminated by ',' optionally enclosed by '"' escaped by '\\';
ERROR 2068 (HY000): LOAD DATA LOCAL INFILE file request rejected due to restrictions on access.

クライアント側も、この機能を有効にする必要があるからです。

my.cnfのclientセクションでlocal-infileを設定するか、mysqlコマンドの起動時に--local-infileで設定します。

今回は、mysqlコマンドのオプションで指定しました。

$ mysql -umysql_user -p -h172.17.0.2 --local-infile=on practice

これでロードし直すと、今度は成功しました。

mysql> load data local infile 'users_500000.csv' into table account fields terminated by ',' optionally enclosed by '"' escaped by '\\';
Query OK, 500000 rows affected (7.73 sec)
Records: 500000  Deleted: 0  Skipped: 0  Warnings: 0

Parallel Table Import Utility

次は、Parallel Table Import Utilityです。

MySQL :: MySQL Shell 8.0 :: 8.4 Parallel Table Import Utility

こちらは、MySQL Shellの機能になります。

mysqlshコマンドから直接実行する方法と、JavaScriptやPythonで実行する方法があります。

どちらの方法を使うにしろ、内部的には結局LOAD DATAを使うので、こちらを有効にしておく必要があります。

mysql> set global local_infile = on;
Query OK, 0 rows affected (0.00 sec)

まずはmysqlshコマンドで実行してみましょう。

$ mysqlsh mysql_user@172.17.0.2:3306 -- util import-table users_500000.csv --schema=practice --table=account --fieldsTerminatedBy=',' --fieldsEnclosedBy='"' --fieldsOptionallyEnclosed=true --fieldsEscapedBy='\' --bytesPerChunk=10M
Please provide the password for 'mysql_user@172.17.0.2:3306': ********
Importing from file '/path/to/users_500000.csv' to table `practice`.`account` in MySQL Server at 172.17.0.2:3306 using 4 threads
[Worker003] users_500000.csv: Records: 43164  Deleted: 0  Skipped: 0  Warnings: 0
[Worker002] users_500000.csv: Records: 131723  Deleted: 0  Skipped: 0  Warnings: 0
[Worker001] users_500000.csv: Records: 163859  Deleted: 0  Skipped: 0  Warnings: 0
[Worker000] users_500000.csv: Records: 161254  Deleted: 0  Skipped: 0  Warnings: 0
100% (32.62 MB / 32.62 MB), 5.31 MB/s
File '/path/to/users_500000.csv' (32.62 MB) was imported in 6.1474 sec at 5.31 MB/s
Total rows affected in practice.account: Records: 500000  Deleted: 0  Skipped: 0  Warnings: 0

fields〜オプションの意味は、LOAD DATAと同じです。schemaでデータ登録先のデータベース名、tableでテーブル名を
指定します。

ドキュメントを見ると、複数のファイルを一気に指定することもできるようです。

何回か試してみましたが、LOAD DATAよりも気持ち速いですが、そこまで差はありません。対象のファイルサイズが
小さいようですね。

なお、今回は4つのスレッドが使われていました。

using 4 threads

スレッド数を決定する変数は2つあり、ひとつは今回指定したbytesPerChunk、もうひとつはthreadsです。

bytesPerChunkのデフォルト値は50Mで、threadsのデフォルト値は8です。

ドキュメントの計算式によると、1とthreadsの大きい方と、チャンクサイズのうち、小さい方が選択されます。

min{max{1, threads}, chunks}}

今回はチャンクサイズを10Mにして、ファイルサイズが32Mだったので4になり、threadsのデフォルト値8よりも小さいので
4スレッドになったということですね。

$ ll -h users_500000.csv
-rw-rw-r-- 1 xxxxx xxxxx 32M  4月 17 23:00 users_500000.csv

ところで、mysql_user:password@172.17.0.2:3306みたいな記述にすると、パスワードも指定して実行できるようです。

JavaScriptでも実行してみましょう。まずはmysqlshコマンドでログイン。

$ mysqlsh mysql_user@172.17.0.2:3306
Please provide the password for 'mysql_user@172.17.0.2:3306': ********
MySQL Shell 8.0.23

Copyright (c) 2016, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its affiliates.
Other names may be trademarks of their respective owners.

Type '\help' or '\?' for help; '\quit' to exit.
Creating a session to 'mysql_user@172.17.0.2:3306'
Fetching schema names for autocompletion... Press ^C to stop.
Your MySQL connection id is 59
Server version: 8.0.23 MySQL Community Server - GPL
No default schema selected; type \use <schema> to set one.
 MySQL  172.17.0.2:3306 ssl  JS > 

実行。

 MySQL  172.17.0.2:3306 ssl  JS > util.importTable('users_500000.csv', {schema: 'practice', table: 'account', fieldsTerminatedBy: ',', fieldsEnclosedBy: '"', fieldsOptionallyEnclosed: true, fieldsEscapedBy: '\\', bytesPerChunk: '10M'})
Importing from file '/path/to/users_500000.csv' to table `practice`.`account` in MySQL Server at 172.17.0.2:3306 using 4 threads
[Worker002] users_500000.csv: Records: 43164  Deleted: 0  Skipped: 0  Warnings: 0
[Worker003] users_500000.csv: Records: 131723  Deleted: 0  Skipped: 0  Warnings: 0
[Worker001] users_500000.csv: Records: 161254  Deleted: 0  Skipped: 0  Warnings: 0
[Worker000] users_500000.csv: Records: 163859  Deleted: 0  Skipped: 0  Warnings: 0
100% (32.62 MB / 32.62 MB), 5.89 MB/s
File ''/path/to/users_500000.csv' (32.62 MB) was imported in 5.5399 sec at 5.89 MB/s
Total rows affected in practice.account: Records: 500000  Deleted: 0  Skipped: 0  Warnings: 0

オプションなどの意味は、先ほどと変わらないので割愛。

JDBC

ここからは、Javaでプログラムを書いていきます。

Maven依存関係と、プラグインの設定。

    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.23</version>
        </dependency>

        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>5.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.littlewings.mysql.Importer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

実行可能JARファイルを作成します。

プログラムの雛形は、こんな感じです。

src/main/java/org/littlewings/mysql/Importer.java

package org.littlewings.mysql;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import com.opencsv.CSVReader;
import com.opencsv.exceptions.CsvValidationException;

public class Importer {
    String mode;
    String filePath;

    public static void main(String... args) throws SQLException, IOException, CsvValidationException {
        Importer importer = new Importer(args[0], args[1]);
        importer.execute();
    }

    public Importer(String mode, String filePath) {
        this.mode = mode;
        this.filePath = filePath;
    }

    public void execute() throws SQLException, IOException, CsvValidationException {
        switch (mode) {
            case "simple":
                simpleInsert();
                break;
            case "batch":
                batchInsert();
                break;
            case "batch-rewrite":
                batchRewriteInsert();
                break;
            default:
                System.out.printf("unknown mode [%s]%n", mode);
                break;
        }
    }

    〜省略〜

    private void log(String format, Object... args) {
        System.out.printf("[%s] %s%n", LocalDateTime.now(), String.format(format, args));
    }
}

実行する種類によって、simple、batch、batch-rewriteの3つを用意しました。

パッケージングしたら

$ mvn package

こんな感じで実行します。

$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar [種類] users_500000.csv

では、それぞれ種類ごとに書いていきます。

単純にINSERT文を実行する

単純にINSERT文を実行するパターンのコードは、こちら。

    private void simpleInsert() throws SQLException, IOException, CsvValidationException {
        Properties properties = new Properties();
        properties.setProperty("user", "mysql_user");
        properties.setProperty("password", "password");

        int commitThreshold = 10000;
        int count = 0;

        long startTime = System.currentTimeMillis();
        long commitStartTime = System.currentTimeMillis();

        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");

        try (CSVReader csvReader = new CSVReader(Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8));
             Connection conn = DriverManager.getConnection("jdbc:mysql://172.17.0.2:3306/practice", properties);
             PreparedStatement statement = conn.prepareStatement("insert into account(id, name, registered, about) values(?, ?, ?, ?)")) {
            conn.setAutoCommit(false);

            String[] columns;

            while ((columns = csvReader.readNext()) != null) {
                count++;

                statement.setInt(1, Integer.parseInt(columns[0]));
                statement.setString(2, columns[1]);
                statement.setTimestamp(3, Timestamp.valueOf(LocalDateTime.parse(columns[2], formatter)));
                statement.setString(4, columns[3]);

                statement.executeUpdate();

                if (count % commitThreshold == 0) {
                    long committedTime = System.currentTimeMillis() - commitStartTime;
                    log("commit, %d records, lap time = %f sec...", count, committedTime * 1.0 / 1000);
                    conn.commit();

                    commitStartTime = System.currentTimeMillis();
                }
            }

            conn.commit();

            long elapsedTime = System.currentTimeMillis() - startTime;
            log("%d records inserted, elapsed time = %f sec", count, elapsedTime * 1.0 / 1000);
        }
    }

CSVファイルを1行ずつ読んでINSERTし、10,000件ごとにコミットするようにしました。

実行の様子は、こちら。

$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar simple users_500000.csv
[2021-04-18T02:08:52.365205] commit, 10000 records, lap time = 3.998000 sec...
[2021-04-18T02:08:54.063781] commit, 20000 records, lap time = 1.643000 sec...
[2021-04-18T02:08:55.629334] commit, 30000 records, lap time = 1.521000 sec...
[2021-04-18T02:08:57.112967] commit, 40000 records, lap time = 1.431000 sec...
[2021-04-18T02:08:58.513753] commit, 50000 records, lap time = 1.357000 sec...
[2021-04-18T02:09:00.039572] commit, 60000 records, lap time = 1.455000 sec...
[2021-04-18T02:09:01.461116] commit, 70000 records, lap time = 1.380000 sec...
[2021-04-18T02:09:02.883278] commit, 80000 records, lap time = 1.379000 sec...
[2021-04-18T02:09:04.375070] commit, 90000 records, lap time = 1.431000 sec...
[2021-04-18T02:09:05.878608] commit, 100000 records, lap time = 1.464000 sec...

〜省略〜

[2021-04-18T02:09:50.253484] commit, 400000 records, lap time = 1.357000 sec...
[2021-04-18T02:09:51.730666] commit, 410000 records, lap time = 1.402000 sec...
[2021-04-18T02:09:53.159438] commit, 420000 records, lap time = 1.367000 sec...
[2021-04-18T02:09:54.605684] commit, 430000 records, lap time = 1.394000 sec...
[2021-04-18T02:09:56.128087] commit, 440000 records, lap time = 1.459000 sec...
[2021-04-18T02:09:57.562830] commit, 450000 records, lap time = 1.381000 sec...
[2021-04-18T02:09:59.001128] commit, 460000 records, lap time = 1.368000 sec...
[2021-04-18T02:10:00.460900] commit, 470000 records, lap time = 1.386000 sec...
[2021-04-18T02:10:01.823628] commit, 480000 records, lap time = 1.308000 sec...
[2021-04-18T02:10:03.394428] commit, 490000 records, lap time = 1.480000 sec...
[2021-04-18T02:10:04.834426] commit, 500000 records, lap time = 1.366000 sec...
[2021-04-18T02:10:04.919687] 500000 records inserted, elapsed time = 76.556000 sec
バッチ更新

次は、バッチ更新。

    private void batchInsert() throws SQLException, IOException, CsvValidationException {
        Properties properties = new Properties();
        properties.setProperty("user", "mysql_user");
        properties.setProperty("password", "password");

        int commitThreshold = 10000;
        int batchThreshold = 10;
        int count = 0;

        long startTime = System.currentTimeMillis();
        long commitStartTime = System.currentTimeMillis();

        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");

        try (CSVReader csvReader = new CSVReader(Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8));
             Connection conn = DriverManager.getConnection("jdbc:mysql://172.17.0.2:3306/practice", properties);
             PreparedStatement statement = conn.prepareStatement("insert into account(id, name, registered, about) values(?, ?, ?, ?)")) {
            conn.setAutoCommit(false);

            String[] columns;

            while ((columns = csvReader.readNext()) != null) {
                count++;

                statement.setInt(1, Integer.parseInt(columns[0]));
                statement.setString(2, columns[1]);
                statement.setTimestamp(3, Timestamp.valueOf(LocalDateTime.parse(columns[2], formatter)));
                statement.setString(4, columns[3]);

                statement.addBatch();

                if (count % batchThreshold == 0) {
                    statement.executeBatch();
                }

                if (count % commitThreshold == 0) {
                    long committedTime = System.currentTimeMillis() - commitStartTime;
                    log("commit, %d records, lap time = %f sec...", count, committedTime * 1.0 / 1000);
                    conn.commit();

                    commitStartTime = System.currentTimeMillis();
                }
            }

            conn.commit();

            long elapsedTime = System.currentTimeMillis() - startTime;
            log("%d records inserted, elapsed time = %f sec", count, elapsedTime * 1.0 / 1000);
        }
    }

先ほどとは違って、10件ごとにバッチ更新するようにしました。

                statement.addBatch();

                if (count % batchThreshold == 0) {
                    statement.executeBatch();
                }

実行の様子。

$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar batch users_500000.csv
[2021-04-18T02:11:18.837457] commit, 10000 records, lap time = 4.386000 sec...
[2021-04-18T02:11:20.909568] commit, 20000 records, lap time = 2.016000 sec...
[2021-04-18T02:11:22.369608] commit, 30000 records, lap time = 1.411000 sec...
[2021-04-18T02:11:23.981233] commit, 40000 records, lap time = 1.564000 sec...
[2021-04-18T02:11:25.655595] commit, 50000 records, lap time = 1.605000 sec...
[2021-04-18T02:11:27.154502] commit, 60000 records, lap time = 1.452000 sec...
[2021-04-18T02:11:28.629899] commit, 70000 records, lap time = 1.435000 sec...
[2021-04-18T02:11:30.353573] commit, 80000 records, lap time = 1.684000 sec...
[2021-04-18T02:11:31.992758] commit, 90000 records, lap time = 1.599000 sec...
[2021-04-18T02:11:33.567422] commit, 100000 records, lap time = 1.529000 sec...

〜省略〜

[2021-04-18T02:12:17.328760] commit, 400000 records, lap time = 1.351000 sec...
[2021-04-18T02:12:18.712159] commit, 410000 records, lap time = 1.329000 sec...
[2021-04-18T02:12:20.172459] commit, 420000 records, lap time = 1.415000 sec...
[2021-04-18T02:12:21.583530] commit, 430000 records, lap time = 1.346000 sec...
[2021-04-18T02:12:23.183495] commit, 440000 records, lap time = 1.525000 sec...
[2021-04-18T02:12:24.645953] commit, 450000 records, lap time = 1.409000 sec...
[2021-04-18T02:12:26.059431] commit, 460000 records, lap time = 1.355000 sec...
[2021-04-18T02:12:27.586332] commit, 470000 records, lap time = 1.427000 sec...
[2021-04-18T02:12:29.061952] commit, 480000 records, lap time = 1.403000 sec...
[2021-04-18T02:12:30.537175] commit, 490000 records, lap time = 1.425000 sec...
[2021-04-18T02:12:31.900668] commit, 500000 records, lap time = 1.290000 sec...
[2021-04-18T02:12:31.957996] 500000 records inserted, elapsed time = 77.511000 sec

単純なINSERT文を実行する方と、ほぼ差がありません…。

rewriteBatchedStatementsを使ったバルクINSERT

最後は、rewriteBatchedStatementsを使ったバルクINSERTです。

    private void batchRewriteInsert() throws SQLException, IOException, CsvValidationException {
        Properties properties = new Properties();
        properties.setProperty("user", "mysql_user");
        properties.setProperty("password", "password");
        properties.setProperty("rewriteBatchedStatements", "true");
        // properties.setProperty("profileSQL", "true");

        int commitThreshold = 10000;
        int batchThreshold = 10;
        int count = 0;

        long startTime = System.currentTimeMillis();
        long commitStartTime = System.currentTimeMillis();

        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");

        try (CSVReader csvReader = new CSVReader(Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8));
             Connection conn = DriverManager.getConnection("jdbc:mysql://172.17.0.2:3306/practice", properties);
             PreparedStatement statement = conn.prepareStatement("insert into account(id, name, registered, about) values(?, ?, ?, ?)")) {
            conn.setAutoCommit(false);

            String[] columns;

            while ((columns = csvReader.readNext()) != null) {
                count++;

                statement.setInt(1, Integer.parseInt(columns[0]));
                statement.setString(2, columns[1]);
                statement.setTimestamp(3, Timestamp.valueOf(LocalDateTime.parse(columns[2], formatter)));
                statement.setString(4, columns[3]);

                statement.addBatch();

                if (count % batchThreshold == 0) {
                    statement.executeBatch();
                }

                if (count % commitThreshold == 0) {
                    long committedTime = System.currentTimeMillis() - commitStartTime;
                    log("commit, %d records, lap time = %f sec...", count, committedTime * 1.0 / 1000);
                    conn.commit();

                    commitStartTime = System.currentTimeMillis();
                }
            }

            conn.commit();

            long elapsedTime = System.currentTimeMillis() - startTime;
            log("%d records inserted, elapsed time = %f sec", count, elapsedTime * 1.0 / 1000);
        }
    }

バッチ更新のプログラムとの違いは、接続する時のプロパティでrewriteBatchedStatementsをtrueにするだけです。

MySQL :: MySQL Connector/J 8.0 Developer Guide :: 6.3.13 Performance Extensions

        properties.setProperty("rewriteBatchedStatements", "true");

実行の様子。

$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar batch-rewrite users_500000.csv
[2021-04-18T02:13:25.024357] commit, 10000 records, lap time = 3.168000 sec...
[2021-04-18T02:13:25.943424] commit, 20000 records, lap time = 0.883000 sec...
[2021-04-18T02:13:26.882186] commit, 30000 records, lap time = 0.910000 sec...
[2021-04-18T02:13:27.540862] commit, 40000 records, lap time = 0.627000 sec...
[2021-04-18T02:13:28.173199] commit, 50000 records, lap time = 0.602000 sec...
[2021-04-18T02:13:28.740784] commit, 60000 records, lap time = 0.531000 sec...
[2021-04-18T02:13:29.340097] commit, 70000 records, lap time = 0.568000 sec...
[2021-04-18T02:13:29.860993] commit, 80000 records, lap time = 0.489000 sec...
[2021-04-18T02:13:30.424720] commit, 90000 records, lap time = 0.515000 sec...
[2021-04-18T02:13:30.932128] commit, 100000 records, lap time = 0.480000 sec...

〜省略〜

[2021-04-18T02:13:47.384536] commit, 400000 records, lap time = 0.449000 sec...
[2021-04-18T02:13:47.899068] commit, 410000 records, lap time = 0.479000 sec...
[2021-04-18T02:13:48.434318] commit, 420000 records, lap time = 0.488000 sec...
[2021-04-18T02:13:48.939430] commit, 430000 records, lap time = 0.476000 sec...
[2021-04-18T02:13:49.577309] commit, 440000 records, lap time = 0.591000 sec...
[2021-04-18T02:13:50.101016] commit, 450000 records, lap time = 0.483000 sec...
[2021-04-18T02:13:50.606555] commit, 460000 records, lap time = 0.454000 sec...
[2021-04-18T02:13:51.219517] commit, 470000 records, lap time = 0.579000 sec...
[2021-04-18T02:13:51.783128] commit, 480000 records, lap time = 0.473000 sec...
[2021-04-18T02:13:52.529406] commit, 490000 records, lap time = 0.671000 sec...
[2021-04-18T02:13:53.129715] commit, 500000 records, lap time = 0.537000 sec...
[2021-04-18T02:13:53.178695] 500000 records inserted, elapsed time = 31.325000 sec

これだけで、倍以上に速くなりました。

載せていたコードではコメントアウトしていますが、profileSQLをtrueにするとSQL文が表示されるようになるので

MySQL :: MySQL Connector/J 8.0 Developer Guide :: 6.3.14 Debugging/Profiling

        properties.setProperty("rewriteBatchedStatements", "true");
        properties.setProperty("profileSQL", "true");

バッチ更新がバルクINSERTに変換されている様子が確認できます。
※値も表示されているのですが、載せていません

Sun Apr 18 02:17:00 JST 2021 INFO: [QUERY] insert into account(id, name, registered, about) values(...),(...),(...),(...),(...),(...),(...),(...),(...),(...) [Created on: Sun Apr 18 02:17:00 JST 2021, duration: 1, connection-id: 73, statement-id: 0, resultset-id: 0,   at org.littlewings.mysql.Importer.batchRewriteInsert(Importer.java:181)]
Sun Apr 18 02:17:00 JST 2021 INFO: [FETCH]  [Created on: Sun Apr 18 02:17:00 JST 2021, duration: 0, connection-id: 73, statement-id: 0, resultset-id: 0,    at org.littlewings.mysql.Importer.batchRewriteInsert(Importer.java:181)]

まとめ

MySQLに対して、LOAD DATA、Parallel Table Import Utility、JDBCでの通常のINSERTやバルクINSERTなどを
試してみました。

LOAD DATAやParallel Table Import Utilityが圧倒的に速いですね。

大量のテキストデータをロードするなら、やっぱりこちらなんでしょうねぇ…。

データ量が多いほど差が出そうなので、特にParallel Table Import Utilityと合わせて確認して選ぼうかなと思います。

オマケ

Stack Overflowのユーザーのデータを、XMLからCSVに変換しつつ、指定のレコード数に絞るプログラムを最後に
載せておきます。

Idが正の値しか拾っていなかったり、AboutMeを255文字で切り捨てたり、日時のフォーマットを変えたりといくつか
変換などをしています。

users_to_csv.groovy

import com.opencsv.CSVParser
import com.opencsv.CSVParserBuilder
import com.opencsv.CSVWriterBuilder

import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
import java.time.format.DateTimeFormatter
import javax.xml.stream.XMLInputFactory
import javax.xml.stream.XMLStreamConstants

@Grab('com.opencsv:opencsv:5.4')
import com.opencsv.CSVWriterBuilder
import com.opencsv.CSVParserBuilder

@Grab('org.jsoup:jsoup:1.13.1')
import org.jsoup.Jsoup

def xmlPath = args[0]
def csvSize = args[1] as int

def reader = Files.newBufferedReader(Paths.get(xmlPath), StandardCharsets.UTF_8)
reader.readLine() // skip BOM.

def outputFile = "users_${csvSize}.csv"
def csvWriter = new CSVWriterBuilder(Files.newBufferedWriter(Paths.get(outputFile), StandardCharsets.UTF_8)).withParser(new CSVParserBuilder().withEscapeChar('\\' as char).build()).build()

def datetimeFormatter = DateTimeFormatter.ofPattern('uuuu-MM-dd HH:mm:ss')

def count = 0

def xmlInputFactory = XMLInputFactory.newInstance()
def streamReader = xmlInputFactory.createXMLStreamReader(reader)

while (streamReader.hasNext()) {
    def eventType = streamReader.next()

    if (eventType == XMLStreamConstants.START_ELEMENT) {
        if (streamReader.getLocalName() == 'row') {
            def accountId = streamReader.getAttributeValue(null, "AccountId")
            def name = streamReader.getAttributeValue(null, 'DisplayName')
            def registeredDate = streamReader.getAttributeValue(null, 'CreationDate')
            def about = streamReader.getAttributeValue(null, "AboutMe")

            if (accountId == null || accountId as int < 0) {
                continue
            }

            registeredDate = datetimeFormatter.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(registeredDate))

            if (about == null) {
                about = ''
            } else {
                about = Jsoup.parse(about).text()
            }

            if (about.length() > 255) {
                about = about.substring(0, 252) + "..."
            }

            csvWriter.writeNext(List.of(accountId, name, registeredDate, about).toArray(new String[0]))

            count++

            if (count % 10000 == 0) {
                println("${count} records, generated...")
            }

            if (count == csvSize) {
                break
            }
        }
    }
}

streamReader.close()
csvWriter.close()