CLOVER🍀

That was when it all began.

MySQL 8.0に、LOAD DATA、Parallel Table Import UtilityMySQL Shell、JDBCでCSVロヌドしおみる

これは、なにをしたくお曞いたもの

MySQLにCSVロヌドをしようずした時に、LOAD DATAを䜿ったり、ふ぀うにINSERT文を䜿ったりずいく぀か方法が
あるず思うのですが、どれくらい違うものだろうずいうこずで詊しおみるこずにしたした。

お題

CSVファむルを甚意しお、以䞋の方法を詊しおみたす。

  • LOAD DATA正確には、LOAD DATA LOCAL
  • Parallel Table Import UtilityMySQL Shell
  • CSVを読み蟌み、INSERT文を実行するJavaプログラム
    • シンプルにINSERTを実行
    • JDBCでのバッチ曎新
    • rewriteBatchedStatementsをtrueにしお、バッチ曎新

デヌタには、Stack Overflowのナヌザヌ情報を4カラムのみ、50䞇件だけ切り出しおCSVファむルにしたものを䜿いたす。

あたりデヌタ量自䜓は倧したこずがないのですが、そんなに気長に埅おなかったからです 。

最初に、それぞれの方法に぀いお曞いおおきたす。

LOAD DATA

LOAD DATAは、テキストファむルからMySQLのテヌブルぞデヌタをロヌドする文です。

SQL ステヌトメント / LOAD DATA ステヌトメント

通垞のINSERT文よりも、20倍速いそうです。

テキストファむルからテヌブルをロヌドする堎合は、LOAD DATA を䜿甚したす。 通垞、これは INSERT ステヌトメントを䜿甚する堎合より、20 倍速くなりたす。

最適化 / INSERT ステヌトメントの最適化

ただし、クラむアント偎にあるファむルをロヌドする堎合LOAD DATA LOCALは、セキュリティ䞊の泚意点があり、
デフォルトでは無効になっおいたす。この機胜を䜿うためには、クラむアントサヌバヌ䞡方で有効にする必芁がありたす。

セキュリティヌ / LOAD DATA LOCAL のセキュリティヌ䞊の考慮事項

Parallel Table Import Utility

Parallel Table Import Utilityは、MySQL Shellの機胜です。

MySQL :: MySQL Shell 8.0 :: 8.4 Parallel Table Import Utility

この機胜は、入力ずなるテキストファむルを解析、チャンクに分割しおMySQLにデヌタを䞊列にロヌドするこずができたす。

内郚的にはLOAD DATAが䞊列で実行されるため、通垞のLOAD DATAがシングルスレッドで動䜜するのに察しお
倧幅に高速化を芋蟌むこずができるようです。

䞊列床は、スレッド数の指定およびチャンクサむズから導出されたす。

min{max{1, threads}, chunks}}

LOAD DATAよりも速くなるかどうかは、䜿甚するデヌタファむルのサむズに䟝存するずころもあるので、蚈枬あるのみですね。

その他、入力の前凊理ずしお倉換ができたり、耇数のファむルを䜿ったむンポヌトができたり、圧瞮されたファむルを
ロヌドしたりもできるようです。

rewriteBatchedStatements

MySQLには、INSERT文のVALUES句に倀を耇数曞き、1床のINSERT文で耇数の行を登録する機胜がありたす。
※バルクINSERT

最適化 / InnoDB テヌブルの䞀括デヌタロヌド

Connector/Jでは、rewriteBatchedStatementsずいうプロパティをtrueにするずJDBCのバッチ曎新をバルクINSERTに
曞き換えお実行しおくれたす。

MySQL :: MySQL Connector/J 8.0 Developer Guide :: 6.3.13 Performance Extensions

結果

先に結果だけ曞いおおくず、ふ぀うにINSERT文を実行するよりも、LOAD DATAを䜿ったりParallel Table Import Utilityを
䜿う方が圧倒的に速いです。

LOAD DATAずParallel Table Import Utilityに぀いおは、今回のデヌタ量ではほが差がありたせんでした。
Parallel Table Import Utilityが、若干速いくらいです。扱ったデヌタ量が小さかったですね 。

JDBCでのINSERT文実行は、rewriteBatchedStatementsをtrueにしたバルクINSERTの方が、他の方法の倍以䞊速い結果に
なりたした。

サマリは、こんな感じです。

50䞇行のCSVに察しお。

  • Parallel Table Import Utility 
 6秒皋床
  • LOAD DATA 
 7〜9秒皋床
  • バルクINSERT 
 30秒皋床
  • 単玔なINSERT、およびバッチ曎新 
 80秒皋床

バルクINSERTおよびバッチ曎新のバッチサむズは、10にしおいたす。

では、実際に䜜成した゜ヌスコヌドなどを蚘茉しおいきたす。

環境

今回の環境は、こちらです。

mysql> select version();
+-----------+
| version() |
+-----------+
| 8.0.23    |
+-----------+
1 row in set (0.01 sec)

MySQLサヌバヌは、172.17.0.2で動䜜しおいるものずしたす。チュヌニングは特にしおおらず、ほがデフォルト倀です。

MySQL Shell。

$ mysqlsh --version
/path/to/mysqlsh   Ver 8.0.23 for Linux on x86_64 - for MySQL 8.0.23 (MySQL Community Server (GPL))

たた、Javaに関する環境情報はこちら。

$ java --version
openjdk 11.0.10 2021-01-19
OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.20.04)
OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 11.0.10, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"

テヌブル定矩ずデヌタ䜜成

DDLは、こんな感じで甚意。

create table account (
  id int,
  name varchar(50),
  registered datetime,
  about varchar(255),
  primary key(id)
);

デヌタはStack Overflowのナヌザヌを䜿いたした。

$ curl -OL https://archive.org/download/stackexchange/stackoverflow.com-Users.7z
$ 7z x stackoverflow.com-Users.7z

$ ll -h Users.xml
-rw-rw-r-- 1 xxxxx xxxxx 4.4G  3月  1 13:31 Users.xml

$ wc -l Users.xml
14080582 Users.xml

䞭身がXMLなこずず、ちょっず倧きいのでデヌタ抜出ず件数を絞り蟌むのにプログラムを䜜成しおCSVに倉換したした。

$ groovy users_to_csv.groovy Users.xml 500000

$ ll -h users_500000.csv
-rw-rw-r-- 1 xxxxx xxxxx 32M  4月 17 23:00 users_500000.csv

ここで䜿ったプログラムは、最埌に茉せたす。users_500000.csvずいうのが、デヌタロヌドに䜿うCSVファむルです。

ロヌドしたデヌタは、次のク゚リの実行前にはTRUNCATEしおいるものずしたす。

mysql> truncate table account;
Query OK, 0 rows affected (0.21 sec)

では、進めおいきたしょう。

LOAD DATA

たずはLOAD DATAからやっおみたす。正確には、LOAD DATA LOCALですが。

SQL ステヌトメント / LOAD DATA ステヌトメント

mysqlコマンドを起動。

$ mysql -umysql_user -p -h172.17.0.2 practice
Enter password: 
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 22
Server version: 8.0.23 MySQL Community Server - GPL

Copyright (c) 2000, 2021, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> 

実行しようずするず、゚ラヌになりたす。

mysql> load data local infile 'users_500000.csv' into table account fields terminated by ',' optionally enclosed by '"' escaped by '\\';
ERROR 3948 (42000): Loading local data is disabled; this must be enabled on both the client and server sides

゚ラヌメッセヌゞの通り、クラむアントのロヌカルデヌタをクラむアントずサヌバヌそれぞれで有効にする必芁がありたす、ず。

こちらのドキュメントに蚘茉されおいる内容です。

セキュリティヌ / LOAD DATA LOCAL のセキュリティヌ䞊の考慮事項

サヌバヌ偎は、my.cnfのmysqldセクションでlocal-infileを蚭定するか、set global local_infileで蚭定したす。

今回はset global local_infileで有効にするこずにしたした。

mysql> set global local_infile = on;
Query OK, 0 rows affected (0.00 sec)

再床実行 するず、゚ラヌメッセヌゞが倉わりたす。

mysql> load data local infile 'users_500000.csv' into table account fields terminated by ',' optionally enclosed by '"' escaped by '\\';
ERROR 2068 (HY000): LOAD DATA LOCAL INFILE file request rejected due to restrictions on access.

クラむアント偎も、この機胜を有効にする必芁があるからです。

my.cnfのclientセクションでlocal-infileを蚭定するか、mysqlコマンドの起動時に--local-infileで蚭定したす。

今回は、mysqlコマンドのオプションで指定したした。

$ mysql -umysql_user -p -h172.17.0.2 --local-infile=on practice

これでロヌドし盎すず、今床は成功したした。

mysql> load data local infile 'users_500000.csv' into table account fields terminated by ',' optionally enclosed by '"' escaped by '\\';
Query OK, 500000 rows affected (7.73 sec)
Records: 500000  Deleted: 0  Skipped: 0  Warnings: 0

Parallel Table Import Utility

次は、Parallel Table Import Utilityです。

MySQL :: MySQL Shell 8.0 :: 8.4 Parallel Table Import Utility

こちらは、MySQL Shellの機胜になりたす。

mysqlshコマンドから盎接実行する方法ず、JavaScriptやPythonで実行する方法がありたす。

どちらの方法を䜿うにしろ、内郚的には結局LOAD DATAを䜿うので、こちらを有効にしおおく必芁がありたす。

mysql> set global local_infile = on;
Query OK, 0 rows affected (0.00 sec)

たずはmysqlshコマンドで実行しおみたしょう。

$ mysqlsh mysql_user@172.17.0.2:3306 -- util import-table users_500000.csv --schema=practice --table=account --fieldsTerminatedBy=',' --fieldsEnclosedBy='"' --fieldsOptionallyEnclosed=true --fieldsEscapedBy='\' --bytesPerChunk=10M
Please provide the password for 'mysql_user@172.17.0.2:3306': ********
Importing from file '/path/to/users_500000.csv' to table `practice`.`account` in MySQL Server at 172.17.0.2:3306 using 4 threads
[Worker003] users_500000.csv: Records: 43164  Deleted: 0  Skipped: 0  Warnings: 0
[Worker002] users_500000.csv: Records: 131723  Deleted: 0  Skipped: 0  Warnings: 0
[Worker001] users_500000.csv: Records: 163859  Deleted: 0  Skipped: 0  Warnings: 0
[Worker000] users_500000.csv: Records: 161254  Deleted: 0  Skipped: 0  Warnings: 0
100% (32.62 MB / 32.62 MB), 5.31 MB/s
File '/path/to/users_500000.csv' (32.62 MB) was imported in 6.1474 sec at 5.31 MB/s
Total rows affected in practice.account: Records: 500000  Deleted: 0  Skipped: 0  Warnings: 0

fields〜オプションの意味は、LOAD DATAず同じです。schemaでデヌタ登録先のデヌタベヌス名、tableでテヌブル名を
指定したす。

ドキュメントを芋るず、耇数のファむルを䞀気に指定するこずもできるようです。

䜕回か詊しおみたしたが、LOAD DATAよりも気持ち速いですが、そこたで差はありたせん。察象のファむルサむズが
小さいようですね。

なお、今回は4぀のスレッドが䜿われおいたした。

using 4 threads

スレッド数を決定する倉数は2぀あり、ひず぀は今回指定したbytesPerChunk、もうひず぀はthreadsです。

bytesPerChunkのデフォルト倀は50Mで、threadsのデフォルト倀は8です。

ドキュメントの蚈算匏によるず、1ずthreadsの倧きい方ず、チャンクサむズのうち、小さい方が遞択されたす。

min{max{1, threads}, chunks}}

今回はチャンクサむズを10Mにしお、ファむルサむズが32Mだったので4になり、threadsのデフォルト倀8よりも小さいので
4スレッドになったずいうこずですね。

$ ll -h users_500000.csv
-rw-rw-r-- 1 xxxxx xxxxx 32M  4月 17 23:00 users_500000.csv

ずころで、mysql_user:password@172.17.0.2:3306みたいな蚘述にするず、パスワヌドも指定しお実行できるようです。

JavaScriptでも実行しおみたしょう。たずはmysqlshコマンドでログむン。

$ mysqlsh mysql_user@172.17.0.2:3306
Please provide the password for 'mysql_user@172.17.0.2:3306': ********
MySQL Shell 8.0.23

Copyright (c) 2016, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its affiliates.
Other names may be trademarks of their respective owners.

Type '\help' or '\?' for help; '\quit' to exit.
Creating a session to 'mysql_user@172.17.0.2:3306'
Fetching schema names for autocompletion... Press ^C to stop.
Your MySQL connection id is 59
Server version: 8.0.23 MySQL Community Server - GPL
No default schema selected; type \use <schema> to set one.
 MySQL  172.17.0.2:3306 ssl  JS > 

実行。

 MySQL  172.17.0.2:3306 ssl  JS > util.importTable('users_500000.csv', {schema: 'practice', table: 'account', fieldsTerminatedBy: ',', fieldsEnclosedBy: '"', fieldsOptionallyEnclosed: true, fieldsEscapedBy: '\\', bytesPerChunk: '10M'})
Importing from file '/path/to/users_500000.csv' to table `practice`.`account` in MySQL Server at 172.17.0.2:3306 using 4 threads
[Worker002] users_500000.csv: Records: 43164  Deleted: 0  Skipped: 0  Warnings: 0
[Worker003] users_500000.csv: Records: 131723  Deleted: 0  Skipped: 0  Warnings: 0
[Worker001] users_500000.csv: Records: 161254  Deleted: 0  Skipped: 0  Warnings: 0
[Worker000] users_500000.csv: Records: 163859  Deleted: 0  Skipped: 0  Warnings: 0
100% (32.62 MB / 32.62 MB), 5.89 MB/s
File ''/path/to/users_500000.csv' (32.62 MB) was imported in 5.5399 sec at 5.89 MB/s
Total rows affected in practice.account: Records: 500000  Deleted: 0  Skipped: 0  Warnings: 0

オプションなどの意味は、先ほどず倉わらないので割愛。

JDBC

ここからは、Javaでプログラムを曞いおいきたす。

Maven䟝存関係ず、プラグむンの蚭定。

    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.23</version>
        </dependency>

        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>5.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.littlewings.mysql.Importer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

実行可胜JARファむルを䜜成したす。

プログラムの雛圢は、こんな感じです。

src/main/java/org/littlewings/mysql/Importer.java

package org.littlewings.mysql;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import com.opencsv.CSVReader;
import com.opencsv.exceptions.CsvValidationException;

public class Importer {
    String mode;
    String filePath;

    public static void main(String... args) throws SQLException, IOException, CsvValidationException {
        Importer importer = new Importer(args[0], args[1]);
        importer.execute();
    }

    public Importer(String mode, String filePath) {
        this.mode = mode;
        this.filePath = filePath;
    }

    public void execute() throws SQLException, IOException, CsvValidationException {
        switch (mode) {
            case "simple":
                simpleInsert();
                break;
            case "batch":
                batchInsert();
                break;
            case "batch-rewrite":
                batchRewriteInsert();
                break;
            default:
                System.out.printf("unknown mode [%s]%n", mode);
                break;
        }
    }

    〜省略〜

    private void log(String format, Object... args) {
        System.out.printf("[%s] %s%n", LocalDateTime.now(), String.format(format, args));
    }
}

実行する皮類によっお、simple、batch、batch-rewriteの3぀を甚意したした。

パッケヌゞングしたら

$ mvn package

こんな感じで実行したす。

$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar [皮類] users_500000.csv

では、それぞれ皮類ごずに曞いおいきたす。

単玔にINSERT文を実行する

単玔にINSERT文を実行するパタヌンのコヌドは、こちら。

    private void simpleInsert() throws SQLException, IOException, CsvValidationException {
        Properties properties = new Properties();
        properties.setProperty("user", "mysql_user");
        properties.setProperty("password", "password");

        int commitThreshold = 10000;
        int count = 0;

        long startTime = System.currentTimeMillis();
        long commitStartTime = System.currentTimeMillis();

        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");

        try (CSVReader csvReader = new CSVReader(Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8));
             Connection conn = DriverManager.getConnection("jdbc:mysql://172.17.0.2:3306/practice", properties);
             PreparedStatement statement = conn.prepareStatement("insert into account(id, name, registered, about) values(?, ?, ?, ?)")) {
            conn.setAutoCommit(false);

            String[] columns;

            while ((columns = csvReader.readNext()) != null) {
                count++;

                statement.setInt(1, Integer.parseInt(columns[0]));
                statement.setString(2, columns[1]);
                statement.setTimestamp(3, Timestamp.valueOf(LocalDateTime.parse(columns[2], formatter)));
                statement.setString(4, columns[3]);

                statement.executeUpdate();

                if (count % commitThreshold == 0) {
                    long committedTime = System.currentTimeMillis() - commitStartTime;
                    log("commit, %d records, lap time = %f sec...", count, committedTime * 1.0 / 1000);
                    conn.commit();

                    commitStartTime = System.currentTimeMillis();
                }
            }

            conn.commit();

            long elapsedTime = System.currentTimeMillis() - startTime;
            log("%d records inserted, elapsed time = %f sec", count, elapsedTime * 1.0 / 1000);
        }
    }

CSVファむルを1行ず぀読んでINSERTし、10,000件ごずにコミットするようにしたした。

実行の様子は、こちら。

$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar simple users_500000.csv
[2021-04-18T02:08:52.365205] commit, 10000 records, lap time = 3.998000 sec...
[2021-04-18T02:08:54.063781] commit, 20000 records, lap time = 1.643000 sec...
[2021-04-18T02:08:55.629334] commit, 30000 records, lap time = 1.521000 sec...
[2021-04-18T02:08:57.112967] commit, 40000 records, lap time = 1.431000 sec...
[2021-04-18T02:08:58.513753] commit, 50000 records, lap time = 1.357000 sec...
[2021-04-18T02:09:00.039572] commit, 60000 records, lap time = 1.455000 sec...
[2021-04-18T02:09:01.461116] commit, 70000 records, lap time = 1.380000 sec...
[2021-04-18T02:09:02.883278] commit, 80000 records, lap time = 1.379000 sec...
[2021-04-18T02:09:04.375070] commit, 90000 records, lap time = 1.431000 sec...
[2021-04-18T02:09:05.878608] commit, 100000 records, lap time = 1.464000 sec...

〜省略〜

[2021-04-18T02:09:50.253484] commit, 400000 records, lap time = 1.357000 sec...
[2021-04-18T02:09:51.730666] commit, 410000 records, lap time = 1.402000 sec...
[2021-04-18T02:09:53.159438] commit, 420000 records, lap time = 1.367000 sec...
[2021-04-18T02:09:54.605684] commit, 430000 records, lap time = 1.394000 sec...
[2021-04-18T02:09:56.128087] commit, 440000 records, lap time = 1.459000 sec...
[2021-04-18T02:09:57.562830] commit, 450000 records, lap time = 1.381000 sec...
[2021-04-18T02:09:59.001128] commit, 460000 records, lap time = 1.368000 sec...
[2021-04-18T02:10:00.460900] commit, 470000 records, lap time = 1.386000 sec...
[2021-04-18T02:10:01.823628] commit, 480000 records, lap time = 1.308000 sec...
[2021-04-18T02:10:03.394428] commit, 490000 records, lap time = 1.480000 sec...
[2021-04-18T02:10:04.834426] commit, 500000 records, lap time = 1.366000 sec...
[2021-04-18T02:10:04.919687] 500000 records inserted, elapsed time = 76.556000 sec
バッチ曎新

次は、バッチ曎新。

    private void batchInsert() throws SQLException, IOException, CsvValidationException {
        Properties properties = new Properties();
        properties.setProperty("user", "mysql_user");
        properties.setProperty("password", "password");

        int commitThreshold = 10000;
        int batchThreshold = 10;
        int count = 0;

        long startTime = System.currentTimeMillis();
        long commitStartTime = System.currentTimeMillis();

        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");

        try (CSVReader csvReader = new CSVReader(Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8));
             Connection conn = DriverManager.getConnection("jdbc:mysql://172.17.0.2:3306/practice", properties);
             PreparedStatement statement = conn.prepareStatement("insert into account(id, name, registered, about) values(?, ?, ?, ?)")) {
            conn.setAutoCommit(false);

            String[] columns;

            while ((columns = csvReader.readNext()) != null) {
                count++;

                statement.setInt(1, Integer.parseInt(columns[0]));
                statement.setString(2, columns[1]);
                statement.setTimestamp(3, Timestamp.valueOf(LocalDateTime.parse(columns[2], formatter)));
                statement.setString(4, columns[3]);

                statement.addBatch();

                if (count % batchThreshold == 0) {
                    statement.executeBatch();
                }

                if (count % commitThreshold == 0) {
                    long committedTime = System.currentTimeMillis() - commitStartTime;
                    log("commit, %d records, lap time = %f sec...", count, committedTime * 1.0 / 1000);
                    conn.commit();

                    commitStartTime = System.currentTimeMillis();
                }
            }

            conn.commit();

            long elapsedTime = System.currentTimeMillis() - startTime;
            log("%d records inserted, elapsed time = %f sec", count, elapsedTime * 1.0 / 1000);
        }
    }

先ほどずは違っお、10件ごずにバッチ曎新するようにしたした。

                statement.addBatch();

                if (count % batchThreshold == 0) {
                    statement.executeBatch();
                }

実行の様子。

$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar batch users_500000.csv
[2021-04-18T02:11:18.837457] commit, 10000 records, lap time = 4.386000 sec...
[2021-04-18T02:11:20.909568] commit, 20000 records, lap time = 2.016000 sec...
[2021-04-18T02:11:22.369608] commit, 30000 records, lap time = 1.411000 sec...
[2021-04-18T02:11:23.981233] commit, 40000 records, lap time = 1.564000 sec...
[2021-04-18T02:11:25.655595] commit, 50000 records, lap time = 1.605000 sec...
[2021-04-18T02:11:27.154502] commit, 60000 records, lap time = 1.452000 sec...
[2021-04-18T02:11:28.629899] commit, 70000 records, lap time = 1.435000 sec...
[2021-04-18T02:11:30.353573] commit, 80000 records, lap time = 1.684000 sec...
[2021-04-18T02:11:31.992758] commit, 90000 records, lap time = 1.599000 sec...
[2021-04-18T02:11:33.567422] commit, 100000 records, lap time = 1.529000 sec...

〜省略〜

[2021-04-18T02:12:17.328760] commit, 400000 records, lap time = 1.351000 sec...
[2021-04-18T02:12:18.712159] commit, 410000 records, lap time = 1.329000 sec...
[2021-04-18T02:12:20.172459] commit, 420000 records, lap time = 1.415000 sec...
[2021-04-18T02:12:21.583530] commit, 430000 records, lap time = 1.346000 sec...
[2021-04-18T02:12:23.183495] commit, 440000 records, lap time = 1.525000 sec...
[2021-04-18T02:12:24.645953] commit, 450000 records, lap time = 1.409000 sec...
[2021-04-18T02:12:26.059431] commit, 460000 records, lap time = 1.355000 sec...
[2021-04-18T02:12:27.586332] commit, 470000 records, lap time = 1.427000 sec...
[2021-04-18T02:12:29.061952] commit, 480000 records, lap time = 1.403000 sec...
[2021-04-18T02:12:30.537175] commit, 490000 records, lap time = 1.425000 sec...
[2021-04-18T02:12:31.900668] commit, 500000 records, lap time = 1.290000 sec...
[2021-04-18T02:12:31.957996] 500000 records inserted, elapsed time = 77.511000 sec

単玔なINSERT文を実行する方ず、ほが差がありたせん 。

rewriteBatchedStatementsを䜿ったバルクINSERT

最埌は、rewriteBatchedStatementsを䜿ったバルクINSERTです。

    private void batchRewriteInsert() throws SQLException, IOException, CsvValidationException {
        Properties properties = new Properties();
        properties.setProperty("user", "mysql_user");
        properties.setProperty("password", "password");
        properties.setProperty("rewriteBatchedStatements", "true");
        // properties.setProperty("profileSQL", "true");

        int commitThreshold = 10000;
        int batchThreshold = 10;
        int count = 0;

        long startTime = System.currentTimeMillis();
        long commitStartTime = System.currentTimeMillis();

        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");

        try (CSVReader csvReader = new CSVReader(Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8));
             Connection conn = DriverManager.getConnection("jdbc:mysql://172.17.0.2:3306/practice", properties);
             PreparedStatement statement = conn.prepareStatement("insert into account(id, name, registered, about) values(?, ?, ?, ?)")) {
            conn.setAutoCommit(false);

            String[] columns;

            while ((columns = csvReader.readNext()) != null) {
                count++;

                statement.setInt(1, Integer.parseInt(columns[0]));
                statement.setString(2, columns[1]);
                statement.setTimestamp(3, Timestamp.valueOf(LocalDateTime.parse(columns[2], formatter)));
                statement.setString(4, columns[3]);

                statement.addBatch();

                if (count % batchThreshold == 0) {
                    statement.executeBatch();
                }

                if (count % commitThreshold == 0) {
                    long committedTime = System.currentTimeMillis() - commitStartTime;
                    log("commit, %d records, lap time = %f sec...", count, committedTime * 1.0 / 1000);
                    conn.commit();

                    commitStartTime = System.currentTimeMillis();
                }
            }

            conn.commit();

            long elapsedTime = System.currentTimeMillis() - startTime;
            log("%d records inserted, elapsed time = %f sec", count, elapsedTime * 1.0 / 1000);
        }
    }

バッチ曎新のプログラムずの違いは、接続する時のプロパティでrewriteBatchedStatementsをtrueにするだけです。

MySQL :: MySQL Connector/J 8.0 Developer Guide :: 6.3.13 Performance Extensions

        properties.setProperty("rewriteBatchedStatements", "true");

実行の様子。

$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar batch-rewrite users_500000.csv
[2021-04-18T02:13:25.024357] commit, 10000 records, lap time = 3.168000 sec...
[2021-04-18T02:13:25.943424] commit, 20000 records, lap time = 0.883000 sec...
[2021-04-18T02:13:26.882186] commit, 30000 records, lap time = 0.910000 sec...
[2021-04-18T02:13:27.540862] commit, 40000 records, lap time = 0.627000 sec...
[2021-04-18T02:13:28.173199] commit, 50000 records, lap time = 0.602000 sec...
[2021-04-18T02:13:28.740784] commit, 60000 records, lap time = 0.531000 sec...
[2021-04-18T02:13:29.340097] commit, 70000 records, lap time = 0.568000 sec...
[2021-04-18T02:13:29.860993] commit, 80000 records, lap time = 0.489000 sec...
[2021-04-18T02:13:30.424720] commit, 90000 records, lap time = 0.515000 sec...
[2021-04-18T02:13:30.932128] commit, 100000 records, lap time = 0.480000 sec...

〜省略〜

[2021-04-18T02:13:47.384536] commit, 400000 records, lap time = 0.449000 sec...
[2021-04-18T02:13:47.899068] commit, 410000 records, lap time = 0.479000 sec...
[2021-04-18T02:13:48.434318] commit, 420000 records, lap time = 0.488000 sec...
[2021-04-18T02:13:48.939430] commit, 430000 records, lap time = 0.476000 sec...
[2021-04-18T02:13:49.577309] commit, 440000 records, lap time = 0.591000 sec...
[2021-04-18T02:13:50.101016] commit, 450000 records, lap time = 0.483000 sec...
[2021-04-18T02:13:50.606555] commit, 460000 records, lap time = 0.454000 sec...
[2021-04-18T02:13:51.219517] commit, 470000 records, lap time = 0.579000 sec...
[2021-04-18T02:13:51.783128] commit, 480000 records, lap time = 0.473000 sec...
[2021-04-18T02:13:52.529406] commit, 490000 records, lap time = 0.671000 sec...
[2021-04-18T02:13:53.129715] commit, 500000 records, lap time = 0.537000 sec...
[2021-04-18T02:13:53.178695] 500000 records inserted, elapsed time = 31.325000 sec

これだけで、倍以䞊に速くなりたした。

茉せおいたコヌドではコメントアりトしおいたすが、profileSQLをtrueにするずSQL文が衚瀺されるようになるので

MySQL :: MySQL Connector/J 8.0 Developer Guide :: 6.3.14 Debugging/Profiling

        properties.setProperty("rewriteBatchedStatements", "true");
        properties.setProperty("profileSQL", "true");

バッチ曎新がバルクINSERTに倉換されおいる様子が確認できたす。
※倀も衚瀺されおいるのですが、茉せおいたせん

Sun Apr 18 02:17:00 JST 2021 INFO: [QUERY] insert into account(id, name, registered, about) values(...),(...),(...),(...),(...),(...),(...),(...),(...),(...) [Created on: Sun Apr 18 02:17:00 JST 2021, duration: 1, connection-id: 73, statement-id: 0, resultset-id: 0,   at org.littlewings.mysql.Importer.batchRewriteInsert(Importer.java:181)]
Sun Apr 18 02:17:00 JST 2021 INFO: [FETCH]  [Created on: Sun Apr 18 02:17:00 JST 2021, duration: 0, connection-id: 73, statement-id: 0, resultset-id: 0,    at org.littlewings.mysql.Importer.batchRewriteInsert(Importer.java:181)]

たずめ

MySQLに察しお、LOAD DATA、Parallel Table Import Utility、JDBCでの通垞のINSERTやバルクINSERTなどを
詊しおみたした。

LOAD DATAやParallel Table Import Utilityが圧倒的に速いですね。

倧量のテキストデヌタをロヌドするなら、やっぱりこちらなんでしょうねぇ 。

デヌタ量が倚いほど差が出そうなので、特にParallel Table Import Utilityず合わせお確認しお遞がうかなず思いたす。

オマケ

Stack Overflowのナヌザヌのデヌタを、XMLからCSVに倉換し぀぀、指定のレコヌド数に絞るプログラムを最埌に
茉せおおきたす。

Idが正の倀しか拟っおいなかったり、AboutMeを255文字で切り捚おたり、日時のフォヌマットを倉えたりずいく぀か
倉換などをしおいたす。

users_to_csv.groovy

import com.opencsv.CSVParser
import com.opencsv.CSVParserBuilder
import com.opencsv.CSVWriterBuilder

import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
import java.time.format.DateTimeFormatter
import javax.xml.stream.XMLInputFactory
import javax.xml.stream.XMLStreamConstants

@Grab('com.opencsv:opencsv:5.4')
import com.opencsv.CSVWriterBuilder
import com.opencsv.CSVParserBuilder

@Grab('org.jsoup:jsoup:1.13.1')
import org.jsoup.Jsoup

def xmlPath = args[0]
def csvSize = args[1] as int

def reader = Files.newBufferedReader(Paths.get(xmlPath), StandardCharsets.UTF_8)
reader.readLine() // skip BOM.

def outputFile = "users_${csvSize}.csv"
def csvWriter = new CSVWriterBuilder(Files.newBufferedWriter(Paths.get(outputFile), StandardCharsets.UTF_8)).withParser(new CSVParserBuilder().withEscapeChar('\\' as char).build()).build()

def datetimeFormatter = DateTimeFormatter.ofPattern('uuuu-MM-dd HH:mm:ss')

def count = 0

def xmlInputFactory = XMLInputFactory.newInstance()
def streamReader = xmlInputFactory.createXMLStreamReader(reader)

while (streamReader.hasNext()) {
    def eventType = streamReader.next()

    if (eventType == XMLStreamConstants.START_ELEMENT) {
        if (streamReader.getLocalName() == 'row') {
            def accountId = streamReader.getAttributeValue(null, "AccountId")
            def name = streamReader.getAttributeValue(null, 'DisplayName')
            def registeredDate = streamReader.getAttributeValue(null, 'CreationDate')
            def about = streamReader.getAttributeValue(null, "AboutMe")

            if (accountId == null || accountId as int < 0) {
                continue
            }

            registeredDate = datetimeFormatter.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(registeredDate))

            if (about == null) {
                about = ''
            } else {
                about = Jsoup.parse(about).text()
            }

            if (about.length() > 255) {
                about = about.substring(0, 252) + "..."
            }

            csvWriter.writeNext(List.of(accountId, name, registeredDate, about).toArray(new String[0]))

            count++

            if (count % 10000 == 0) {
                println("${count} records, generated...")
            }

            if (count == csvSize) {
                break
            }
        }
    }
}

streamReader.close()
csvWriter.close()