これは、なにをしたくて書いたもの?
MySQLにCSVロードをしようとした時に、LOAD DATA
を使ったり、ふつうにINSERT文を使ったりといくつか方法が
あると思うのですが、どれくらい違うものだろう?ということで試してみることにしました。
お題
CSVファイルを用意して、以下の方法を試してみます。
LOAD DATA
(正確には、LOAD DATA LOCAL
)- Parallel Table Import Utility(MySQL Shell)
- CSVを読み込み、INSERT文を実行するJavaプログラム
- シンプルにINSERTを実行
- JDBCでのバッチ更新
rewriteBatchedStatements
をtrue
にして、バッチ更新
データには、Stack Overflowのユーザー情報を4カラムのみ、50万件だけ切り出してCSVファイルにしたものを使います。
あまりデータ量自体は大したことがないのですが、そんなに気長に待てなかったからです…。
最初に、それぞれの方法について書いておきます。
LOAD DATA
LOAD DATA
は、テキストファイルからMySQLのテーブルへデータをロードする文です。
SQL ステートメント / LOAD DATA ステートメント
通常のINSERT文よりも、20倍速いそうです。
テキストファイルからテーブルをロードする場合は、LOAD DATA を使用します。 通常、これは INSERT ステートメントを使用する場合より、20 倍速くなります。
ただし、クライアント側にあるファイルをロードする場合(LOAD DATA LOCAL
)は、セキュリティ上の注意点があり、
デフォルトでは無効になっています。この機能を使うためには、クライアント/サーバー両方で有効にする必要があります。
セキュリティー / LOAD DATA LOCAL のセキュリティー上の考慮事項
Parallel Table Import Utility
Parallel Table Import Utilityは、MySQL Shellの機能です。
MySQL :: MySQL Shell 8.0 :: 11.4 Parallel Table Import Utility
この機能は、入力となるテキストファイルを解析、チャンクに分割してMySQLにデータを並列にロードすることができます。
内部的にはLOAD DATA
が並列で実行されるため、通常のLOAD DATA
がシングルスレッドで動作するのに対して
大幅に高速化を見込むことができるようです。
並列度は、スレッド数の指定およびチャンクサイズから導出されます。
min{max{1, threads}, chunks}}
LOAD DATA
よりも速くなるかどうかは、使用するデータファイルのサイズに依存するところもあるので、計測あるのみですね。
その他、入力の前処理として変換ができたり、複数のファイルを使ったインポートができたり、圧縮されたファイルを
ロードしたりもできるようです。
rewriteBatchedStatements
MySQLには、INSERT文のVALUES句に値を複数書き、1度のINSERT文で複数の行を登録する機能があります。
※バルクINSERT
Connector/Jでは、rewriteBatchedStatements
というプロパティをtrue
にするとJDBCのバッチ更新をバルクINSERTに
書き換えて実行してくれます。
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html
結果
先に結果だけ書いておくと、ふつうにINSERT文を実行するよりも、LOAD DATA
を使ったりParallel Table Import Utilityを
使う方が圧倒的に速いです。
LOAD DATA
とParallel Table Import Utilityについては、今回のデータ量ではほぼ差がありませんでした。
Parallel Table Import Utilityが、若干速いくらいです。扱ったデータ量が小さかったですね…。
JDBCでのINSERT文実行は、rewriteBatchedStatements
をtrue
にしたバルクINSERTの方が、他の方法の倍以上速い結果に
なりました。
サマリは、こんな感じです。
50万行のCSVに対して。
- Parallel Table Import Utility … 6秒程度
LOAD DATA
… 7〜9秒程度- バルクINSERT … 30秒程度
- 単純なINSERT、およびバッチ更新 … 80秒程度
バルクINSERTおよびバッチ更新のバッチサイズは、10にしています。
では、実際に作成したソースコードなどを記載していきます。
環境
今回の環境は、こちらです。
mysql> select version(); +-----------+ | version() | +-----------+ | 8.0.23 | +-----------+ 1 row in set (0.01 sec)
MySQLサーバーは、172.17.0.2で動作しているものとします。チューニングは特にしておらず、ほぼデフォルト値です。
MySQL Shell。
$ mysqlsh --version /path/to/mysqlsh Ver 8.0.23 for Linux on x86_64 - for MySQL 8.0.23 (MySQL Community Server (GPL))
また、Javaに関する環境情報はこちら。
$ java --version openjdk 11.0.10 2021-01-19 OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.20.04) OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.20.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.10, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"
テーブル定義とデータ作成
DDLは、こんな感じで用意。
create table account ( id int, name varchar(50), registered datetime, about varchar(255), primary key(id) );
データはStack Overflowのユーザーを使いました。
$ curl -OL https://archive.org/download/stackexchange/stackoverflow.com-Users.7z $ 7z x stackoverflow.com-Users.7z $ ll -h Users.xml -rw-rw-r-- 1 xxxxx xxxxx 4.4G 3月 1 13:31 Users.xml $ wc -l Users.xml 14080582 Users.xml
中身がXMLなことと、ちょっと大きいのでデータ抽出と件数を絞り込むのにプログラムを作成してCSVに変換しました。
$ groovy users_to_csv.groovy Users.xml 500000 $ ll -h users_500000.csv -rw-rw-r-- 1 xxxxx xxxxx 32M 4月 17 23:00 users_500000.csv
ここで使ったプログラムは、最後に載せます。users_500000.csv
というのが、データロードに使うCSVファイルです。
ロードしたデータは、次のクエリの実行前にはTRUNCATE
しているものとします。
mysql> truncate table account; Query OK, 0 rows affected (0.21 sec)
では、進めていきましょう。
LOAD DATA
まずはLOAD DATA
からやってみます。正確には、LOAD DATA LOCAL
ですが。
SQL ステートメント / LOAD DATA ステートメント
mysql
コマンドを起動。
$ mysql -umysql_user -p -h172.17.0.2 practice Enter password: Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 22 Server version: 8.0.23 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql>
実行しようとすると、エラーになります。
mysql> load data local infile 'users_500000.csv' into table account fields terminated by ',' optionally enclosed by '"' escaped by '\\'; ERROR 3948 (42000): Loading local data is disabled; this must be enabled on both the client and server sides
エラーメッセージの通り、クライアントのローカルデータをクライアントとサーバーそれぞれで有効にする必要があります、と。
こちらのドキュメントに記載されている内容です。
セキュリティー / LOAD DATA LOCAL のセキュリティー上の考慮事項
サーバー側は、my.cnf
のmysqld
セクションでlocal-infile
を設定するか、set global local_infile
で設定します。
今回はset global local_infile
で有効にすることにしました。
mysql> set global local_infile = on; Query OK, 0 rows affected (0.00 sec)
再度実行…すると、エラーメッセージが変わります。
mysql> load data local infile 'users_500000.csv' into table account fields terminated by ',' optionally enclosed by '"' escaped by '\\'; ERROR 2068 (HY000): LOAD DATA LOCAL INFILE file request rejected due to restrictions on access.
クライアント側も、この機能を有効にする必要があるからです。
my.cnf
のclient
セクションでlocal-infile
を設定するか、mysql
コマンドの起動時に--local-infile
で設定します。
今回は、mysql
コマンドのオプションで指定しました。
$ mysql -umysql_user -p -h172.17.0.2 --local-infile=on practice
これでロードし直すと、今度は成功しました。
mysql> load data local infile 'users_500000.csv' into table account fields terminated by ',' optionally enclosed by '"' escaped by '\\'; Query OK, 500000 rows affected (7.73 sec) Records: 500000 Deleted: 0 Skipped: 0 Warnings: 0
Parallel Table Import Utility
次は、Parallel Table Import Utilityです。
MySQL :: MySQL Shell 8.0 :: 11.4 Parallel Table Import Utility
こちらは、MySQL Shellの機能になります。
mysqlsh
コマンドから直接実行する方法と、JavaScriptやPythonで実行する方法があります。
どちらの方法を使うにしろ、内部的には結局LOAD DATA
を使うので、こちらを有効にしておく必要があります。
mysql> set global local_infile = on; Query OK, 0 rows affected (0.00 sec)
まずはmysqlsh
コマンドで実行してみましょう。
$ mysqlsh mysql_user@172.17.0.2:3306 -- util import-table users_500000.csv --schema=practice --table=account --fieldsTerminatedBy=',' --fieldsEnclosedBy='"' --fieldsOptionallyEnclosed=true --fieldsEscapedBy='\' --bytesPerChunk=10M Please provide the password for 'mysql_user@172.17.0.2:3306': ******** Importing from file '/path/to/users_500000.csv' to table `practice`.`account` in MySQL Server at 172.17.0.2:3306 using 4 threads [Worker003] users_500000.csv: Records: 43164 Deleted: 0 Skipped: 0 Warnings: 0 [Worker002] users_500000.csv: Records: 131723 Deleted: 0 Skipped: 0 Warnings: 0 [Worker001] users_500000.csv: Records: 163859 Deleted: 0 Skipped: 0 Warnings: 0 [Worker000] users_500000.csv: Records: 161254 Deleted: 0 Skipped: 0 Warnings: 0 100% (32.62 MB / 32.62 MB), 5.31 MB/s File '/path/to/users_500000.csv' (32.62 MB) was imported in 6.1474 sec at 5.31 MB/s Total rows affected in practice.account: Records: 500000 Deleted: 0 Skipped: 0 Warnings: 0
fields〜
オプションの意味は、LOAD DATA
と同じです。schema
でデータ登録先のデータベース名、table
でテーブル名を
指定します。
ドキュメントを見ると、複数のファイルを一気に指定することもできるようです。
何回か試してみましたが、LOAD DATA
よりも気持ち速いですが、そこまで差はありません。対象のファイルサイズが
小さいようですね。
なお、今回は4つのスレッドが使われていました。
using 4 threads
スレッド数を決定する変数は2つあり、ひとつは今回指定したbytesPerChunk
、もうひとつはthreads
です。
bytesPerChunk
のデフォルト値は50Mで、threads
のデフォルト値は8です。
ドキュメントの計算式によると、1とthreads
の大きい方と、チャンクサイズのうち、小さい方が選択されます。
min{max{1, threads}, chunks}}
今回はチャンクサイズを10Mにして、ファイルサイズが32Mだったので4になり、threads
のデフォルト値8よりも小さいので
4スレッドになったということですね。
$ ll -h users_500000.csv -rw-rw-r-- 1 xxxxx xxxxx 32M 4月 17 23:00 users_500000.csv
ところで、mysql_user:password@172.17.0.2:3306
みたいな記述にすると、パスワードも指定して実行できるようです。
JavaScriptでも実行してみましょう。まずはmysqlsh
コマンドでログイン。
$ mysqlsh mysql_user@172.17.0.2:3306 Please provide the password for 'mysql_user@172.17.0.2:3306': ******** MySQL Shell 8.0.23 Copyright (c) 2016, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type '\help' or '\?' for help; '\quit' to exit. Creating a session to 'mysql_user@172.17.0.2:3306' Fetching schema names for autocompletion... Press ^C to stop. Your MySQL connection id is 59 Server version: 8.0.23 MySQL Community Server - GPL No default schema selected; type \use <schema> to set one. MySQL 172.17.0.2:3306 ssl JS >
実行。
MySQL 172.17.0.2:3306 ssl JS > util.importTable('users_500000.csv', {schema: 'practice', table: 'account', fieldsTerminatedBy: ',', fieldsEnclosedBy: '"', fieldsOptionallyEnclosed: true, fieldsEscapedBy: '\\', bytesPerChunk: '10M'}) Importing from file '/path/to/users_500000.csv' to table `practice`.`account` in MySQL Server at 172.17.0.2:3306 using 4 threads [Worker002] users_500000.csv: Records: 43164 Deleted: 0 Skipped: 0 Warnings: 0 [Worker003] users_500000.csv: Records: 131723 Deleted: 0 Skipped: 0 Warnings: 0 [Worker001] users_500000.csv: Records: 161254 Deleted: 0 Skipped: 0 Warnings: 0 [Worker000] users_500000.csv: Records: 163859 Deleted: 0 Skipped: 0 Warnings: 0 100% (32.62 MB / 32.62 MB), 5.89 MB/s File ''/path/to/users_500000.csv' (32.62 MB) was imported in 5.5399 sec at 5.89 MB/s Total rows affected in practice.account: Records: 500000 Deleted: 0 Skipped: 0 Warnings: 0
オプションなどの意味は、先ほどと変わらないので割愛。
JDBC
ここからは、Javaでプログラムを書いていきます。
<dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.23</version> </dependency> <dependency> <groupId>com.opencsv</groupId> <artifactId>opencsv</artifactId> <version>5.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.littlewings.mysql.Importer</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
実行可能JARファイルを作成します。
プログラムの雛形は、こんな感じです。
src/main/java/org/littlewings/mysql/Importer.java
package org.littlewings.mysql; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Properties; import com.opencsv.CSVReader; import com.opencsv.exceptions.CsvValidationException; public class Importer { String mode; String filePath; public static void main(String... args) throws SQLException, IOException, CsvValidationException { Importer importer = new Importer(args[0], args[1]); importer.execute(); } public Importer(String mode, String filePath) { this.mode = mode; this.filePath = filePath; } public void execute() throws SQLException, IOException, CsvValidationException { switch (mode) { case "simple": simpleInsert(); break; case "batch": batchInsert(); break; case "batch-rewrite": batchRewriteInsert(); break; default: System.out.printf("unknown mode [%s]%n", mode); break; } } 〜省略〜 private void log(String format, Object... args) { System.out.printf("[%s] %s%n", LocalDateTime.now(), String.format(format, args)); } }
実行する種類によって、simple
、batch
、batch-rewrite
の3つを用意しました。
パッケージングしたら
$ mvn package
こんな感じで実行します。
$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar [種類] users_500000.csv
では、それぞれ種類ごとに書いていきます。
単純にINSERT文を実行する
単純にINSERT文を実行するパターンのコードは、こちら。
private void simpleInsert() throws SQLException, IOException, CsvValidationException { Properties properties = new Properties(); properties.setProperty("user", "mysql_user"); properties.setProperty("password", "password"); int commitThreshold = 10000; int count = 0; long startTime = System.currentTimeMillis(); long commitStartTime = System.currentTimeMillis(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss"); try (CSVReader csvReader = new CSVReader(Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8)); Connection conn = DriverManager.getConnection("jdbc:mysql://172.17.0.2:3306/practice", properties); PreparedStatement statement = conn.prepareStatement("insert into account(id, name, registered, about) values(?, ?, ?, ?)")) { conn.setAutoCommit(false); String[] columns; while ((columns = csvReader.readNext()) != null) { count++; statement.setInt(1, Integer.parseInt(columns[0])); statement.setString(2, columns[1]); statement.setTimestamp(3, Timestamp.valueOf(LocalDateTime.parse(columns[2], formatter))); statement.setString(4, columns[3]); statement.executeUpdate(); if (count % commitThreshold == 0) { long committedTime = System.currentTimeMillis() - commitStartTime; log("commit, %d records, lap time = %f sec...", count, committedTime * 1.0 / 1000); conn.commit(); commitStartTime = System.currentTimeMillis(); } } conn.commit(); long elapsedTime = System.currentTimeMillis() - startTime; log("%d records inserted, elapsed time = %f sec", count, elapsedTime * 1.0 / 1000); } }
CSVファイルを1行ずつ読んでINSERTし、10,000件ごとにコミットするようにしました。
実行の様子は、こちら。
$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar simple users_500000.csv [2021-04-18T02:08:52.365205] commit, 10000 records, lap time = 3.998000 sec... [2021-04-18T02:08:54.063781] commit, 20000 records, lap time = 1.643000 sec... [2021-04-18T02:08:55.629334] commit, 30000 records, lap time = 1.521000 sec... [2021-04-18T02:08:57.112967] commit, 40000 records, lap time = 1.431000 sec... [2021-04-18T02:08:58.513753] commit, 50000 records, lap time = 1.357000 sec... [2021-04-18T02:09:00.039572] commit, 60000 records, lap time = 1.455000 sec... [2021-04-18T02:09:01.461116] commit, 70000 records, lap time = 1.380000 sec... [2021-04-18T02:09:02.883278] commit, 80000 records, lap time = 1.379000 sec... [2021-04-18T02:09:04.375070] commit, 90000 records, lap time = 1.431000 sec... [2021-04-18T02:09:05.878608] commit, 100000 records, lap time = 1.464000 sec... 〜省略〜 [2021-04-18T02:09:50.253484] commit, 400000 records, lap time = 1.357000 sec... [2021-04-18T02:09:51.730666] commit, 410000 records, lap time = 1.402000 sec... [2021-04-18T02:09:53.159438] commit, 420000 records, lap time = 1.367000 sec... [2021-04-18T02:09:54.605684] commit, 430000 records, lap time = 1.394000 sec... [2021-04-18T02:09:56.128087] commit, 440000 records, lap time = 1.459000 sec... [2021-04-18T02:09:57.562830] commit, 450000 records, lap time = 1.381000 sec... [2021-04-18T02:09:59.001128] commit, 460000 records, lap time = 1.368000 sec... [2021-04-18T02:10:00.460900] commit, 470000 records, lap time = 1.386000 sec... [2021-04-18T02:10:01.823628] commit, 480000 records, lap time = 1.308000 sec... [2021-04-18T02:10:03.394428] commit, 490000 records, lap time = 1.480000 sec... [2021-04-18T02:10:04.834426] commit, 500000 records, lap time = 1.366000 sec... [2021-04-18T02:10:04.919687] 500000 records inserted, elapsed time = 76.556000 sec
バッチ更新
次は、バッチ更新。
private void batchInsert() throws SQLException, IOException, CsvValidationException { Properties properties = new Properties(); properties.setProperty("user", "mysql_user"); properties.setProperty("password", "password"); int commitThreshold = 10000; int batchThreshold = 10; int count = 0; long startTime = System.currentTimeMillis(); long commitStartTime = System.currentTimeMillis(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss"); try (CSVReader csvReader = new CSVReader(Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8)); Connection conn = DriverManager.getConnection("jdbc:mysql://172.17.0.2:3306/practice", properties); PreparedStatement statement = conn.prepareStatement("insert into account(id, name, registered, about) values(?, ?, ?, ?)")) { conn.setAutoCommit(false); String[] columns; while ((columns = csvReader.readNext()) != null) { count++; statement.setInt(1, Integer.parseInt(columns[0])); statement.setString(2, columns[1]); statement.setTimestamp(3, Timestamp.valueOf(LocalDateTime.parse(columns[2], formatter))); statement.setString(4, columns[3]); statement.addBatch(); if (count % batchThreshold == 0) { statement.executeBatch(); } if (count % commitThreshold == 0) { long committedTime = System.currentTimeMillis() - commitStartTime; log("commit, %d records, lap time = %f sec...", count, committedTime * 1.0 / 1000); conn.commit(); commitStartTime = System.currentTimeMillis(); } } conn.commit(); long elapsedTime = System.currentTimeMillis() - startTime; log("%d records inserted, elapsed time = %f sec", count, elapsedTime * 1.0 / 1000); } }
先ほどとは違って、10件ごとにバッチ更新するようにしました。
statement.addBatch(); if (count % batchThreshold == 0) { statement.executeBatch(); }
実行の様子。
$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar batch users_500000.csv [2021-04-18T02:11:18.837457] commit, 10000 records, lap time = 4.386000 sec... [2021-04-18T02:11:20.909568] commit, 20000 records, lap time = 2.016000 sec... [2021-04-18T02:11:22.369608] commit, 30000 records, lap time = 1.411000 sec... [2021-04-18T02:11:23.981233] commit, 40000 records, lap time = 1.564000 sec... [2021-04-18T02:11:25.655595] commit, 50000 records, lap time = 1.605000 sec... [2021-04-18T02:11:27.154502] commit, 60000 records, lap time = 1.452000 sec... [2021-04-18T02:11:28.629899] commit, 70000 records, lap time = 1.435000 sec... [2021-04-18T02:11:30.353573] commit, 80000 records, lap time = 1.684000 sec... [2021-04-18T02:11:31.992758] commit, 90000 records, lap time = 1.599000 sec... [2021-04-18T02:11:33.567422] commit, 100000 records, lap time = 1.529000 sec... 〜省略〜 [2021-04-18T02:12:17.328760] commit, 400000 records, lap time = 1.351000 sec... [2021-04-18T02:12:18.712159] commit, 410000 records, lap time = 1.329000 sec... [2021-04-18T02:12:20.172459] commit, 420000 records, lap time = 1.415000 sec... [2021-04-18T02:12:21.583530] commit, 430000 records, lap time = 1.346000 sec... [2021-04-18T02:12:23.183495] commit, 440000 records, lap time = 1.525000 sec... [2021-04-18T02:12:24.645953] commit, 450000 records, lap time = 1.409000 sec... [2021-04-18T02:12:26.059431] commit, 460000 records, lap time = 1.355000 sec... [2021-04-18T02:12:27.586332] commit, 470000 records, lap time = 1.427000 sec... [2021-04-18T02:12:29.061952] commit, 480000 records, lap time = 1.403000 sec... [2021-04-18T02:12:30.537175] commit, 490000 records, lap time = 1.425000 sec... [2021-04-18T02:12:31.900668] commit, 500000 records, lap time = 1.290000 sec... [2021-04-18T02:12:31.957996] 500000 records inserted, elapsed time = 77.511000 sec
単純なINSERT文を実行する方と、ほぼ差がありません…。
あとで調べてみたら、Connector/J(MySQLのJDBCドライバ)ではバッチ更新はふつうに実行するのと変わらないみたいです…。
速度を改善するには、rewriteBatchedStatements
を使うことになりますね。
rewriteBatchedStatementsを使ったバルクINSERT
最後は、rewriteBatchedStatements
を使ったバルクINSERTです。
private void batchRewriteInsert() throws SQLException, IOException, CsvValidationException { Properties properties = new Properties(); properties.setProperty("user", "mysql_user"); properties.setProperty("password", "password"); properties.setProperty("rewriteBatchedStatements", "true"); // properties.setProperty("profileSQL", "true"); int commitThreshold = 10000; int batchThreshold = 10; int count = 0; long startTime = System.currentTimeMillis(); long commitStartTime = System.currentTimeMillis(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss"); try (CSVReader csvReader = new CSVReader(Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8)); Connection conn = DriverManager.getConnection("jdbc:mysql://172.17.0.2:3306/practice", properties); PreparedStatement statement = conn.prepareStatement("insert into account(id, name, registered, about) values(?, ?, ?, ?)")) { conn.setAutoCommit(false); String[] columns; while ((columns = csvReader.readNext()) != null) { count++; statement.setInt(1, Integer.parseInt(columns[0])); statement.setString(2, columns[1]); statement.setTimestamp(3, Timestamp.valueOf(LocalDateTime.parse(columns[2], formatter))); statement.setString(4, columns[3]); statement.addBatch(); if (count % batchThreshold == 0) { statement.executeBatch(); } if (count % commitThreshold == 0) { long committedTime = System.currentTimeMillis() - commitStartTime; log("commit, %d records, lap time = %f sec...", count, committedTime * 1.0 / 1000); conn.commit(); commitStartTime = System.currentTimeMillis(); } } conn.commit(); long elapsedTime = System.currentTimeMillis() - startTime; log("%d records inserted, elapsed time = %f sec", count, elapsedTime * 1.0 / 1000); } }
バッチ更新のプログラムとの違いは、接続する時のプロパティでrewriteBatchedStatements
をtrue
にするだけです。
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html
properties.setProperty("rewriteBatchedStatements", "true");
実行の様子。
$ java -jar target/mysql-import-0.0.1-SNAPSHOT.jar batch-rewrite users_500000.csv [2021-04-18T02:13:25.024357] commit, 10000 records, lap time = 3.168000 sec... [2021-04-18T02:13:25.943424] commit, 20000 records, lap time = 0.883000 sec... [2021-04-18T02:13:26.882186] commit, 30000 records, lap time = 0.910000 sec... [2021-04-18T02:13:27.540862] commit, 40000 records, lap time = 0.627000 sec... [2021-04-18T02:13:28.173199] commit, 50000 records, lap time = 0.602000 sec... [2021-04-18T02:13:28.740784] commit, 60000 records, lap time = 0.531000 sec... [2021-04-18T02:13:29.340097] commit, 70000 records, lap time = 0.568000 sec... [2021-04-18T02:13:29.860993] commit, 80000 records, lap time = 0.489000 sec... [2021-04-18T02:13:30.424720] commit, 90000 records, lap time = 0.515000 sec... [2021-04-18T02:13:30.932128] commit, 100000 records, lap time = 0.480000 sec... 〜省略〜 [2021-04-18T02:13:47.384536] commit, 400000 records, lap time = 0.449000 sec... [2021-04-18T02:13:47.899068] commit, 410000 records, lap time = 0.479000 sec... [2021-04-18T02:13:48.434318] commit, 420000 records, lap time = 0.488000 sec... [2021-04-18T02:13:48.939430] commit, 430000 records, lap time = 0.476000 sec... [2021-04-18T02:13:49.577309] commit, 440000 records, lap time = 0.591000 sec... [2021-04-18T02:13:50.101016] commit, 450000 records, lap time = 0.483000 sec... [2021-04-18T02:13:50.606555] commit, 460000 records, lap time = 0.454000 sec... [2021-04-18T02:13:51.219517] commit, 470000 records, lap time = 0.579000 sec... [2021-04-18T02:13:51.783128] commit, 480000 records, lap time = 0.473000 sec... [2021-04-18T02:13:52.529406] commit, 490000 records, lap time = 0.671000 sec... [2021-04-18T02:13:53.129715] commit, 500000 records, lap time = 0.537000 sec... [2021-04-18T02:13:53.178695] 500000 records inserted, elapsed time = 31.325000 sec
これだけで、倍以上に速くなりました。
載せていたコードではコメントアウトしていますが、profileSQL
をtrue
にするとSQL文が表示されるようになるので
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-debugging-profiling.html
properties.setProperty("rewriteBatchedStatements", "true"); properties.setProperty("profileSQL", "true");
バッチ更新がバルクINSERTに変換されている様子が確認できます。
※値も表示されているのですが、載せていません
Sun Apr 18 02:17:00 JST 2021 INFO: [QUERY] insert into account(id, name, registered, about) values(...),(...),(...),(...),(...),(...),(...),(...),(...),(...) [Created on: Sun Apr 18 02:17:00 JST 2021, duration: 1, connection-id: 73, statement-id: 0, resultset-id: 0, at org.littlewings.mysql.Importer.batchRewriteInsert(Importer.java:181)] Sun Apr 18 02:17:00 JST 2021 INFO: [FETCH] [Created on: Sun Apr 18 02:17:00 JST 2021, duration: 0, connection-id: 73, statement-id: 0, resultset-id: 0, at org.littlewings.mysql.Importer.batchRewriteInsert(Importer.java:181)]
まとめ
MySQLに対して、LOAD DATA
、Parallel Table Import Utility、JDBCでの通常のINSERTやバルクINSERTなどを
試してみました。
LOAD DATA
やParallel Table Import Utilityが圧倒的に速いですね。
大量のテキストデータをロードするなら、やっぱりこちらなんでしょうねぇ…。
データ量が多いほど差が出そうなので、特にParallel Table Import Utilityと合わせて確認して選ぼうかなと思います。
オマケ
Stack Overflowのユーザーのデータを、XMLからCSVに変換しつつ、指定のレコード数に絞るプログラムを最後に
載せておきます。
Id
が正の値しか拾っていなかったり、AboutMe
を255文字で切り捨てたり、日時のフォーマットを変えたりといくつか
変換などをしています。
users_to_csv.groovy
import com.opencsv.CSVParser import com.opencsv.CSVParserBuilder import com.opencsv.CSVWriterBuilder import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.Paths import java.time.format.DateTimeFormatter import javax.xml.stream.XMLInputFactory import javax.xml.stream.XMLStreamConstants @Grab('com.opencsv:opencsv:5.4') import com.opencsv.CSVWriterBuilder import com.opencsv.CSVParserBuilder @Grab('org.jsoup:jsoup:1.13.1') import org.jsoup.Jsoup def xmlPath = args[0] def csvSize = args[1] as int def reader = Files.newBufferedReader(Paths.get(xmlPath), StandardCharsets.UTF_8) reader.readLine() // skip BOM. def outputFile = "users_${csvSize}.csv" def csvWriter = new CSVWriterBuilder(Files.newBufferedWriter(Paths.get(outputFile), StandardCharsets.UTF_8)).withParser(new CSVParserBuilder().withEscapeChar('\\' as char).build()).build() def datetimeFormatter = DateTimeFormatter.ofPattern('uuuu-MM-dd HH:mm:ss') def count = 0 def xmlInputFactory = XMLInputFactory.newInstance() def streamReader = xmlInputFactory.createXMLStreamReader(reader) while (streamReader.hasNext()) { def eventType = streamReader.next() if (eventType == XMLStreamConstants.START_ELEMENT) { if (streamReader.getLocalName() == 'row') { def accountId = streamReader.getAttributeValue(null, "AccountId") def name = streamReader.getAttributeValue(null, 'DisplayName') def registeredDate = streamReader.getAttributeValue(null, 'CreationDate') def about = streamReader.getAttributeValue(null, "AboutMe") if (accountId == null || accountId as int < 0) { continue } registeredDate = datetimeFormatter.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(registeredDate)) if (about == null) { about = '' } else { about = Jsoup.parse(about).text() } if (about.length() > 255) { about = about.substring(0, 252) + "..." } csvWriter.writeNext(List.of(accountId, name, registeredDate, about).toArray(new String[0])) count++ if (count % 10000 == 0) { println("${count} records, generated...") } if (count == csvSize) { break } } } } streamReader.close() csvWriter.close()