CLOVER🍀

That was when it all began.

MySQL+Connector/Jを使って、大量データのSELECT⇒INSERTした時の挙動を確認する

MySQLとそのJDBCドライバ(Connnector/J)を使った時の困った罠について、こちらによくまとまったエントリがございまして。

MySQL Connector/J (JDBC ドライバ)の罠まとめ - ~saiya/hatenablog

で、この中でもちょっと気になるもの

SELECT 結果は全部メモリに載ってしまう (デフォルト設定で)

http://saiya-moebius.hatenablog.com/entry/2014/08/20/230445

について、自分でも確認してみたいと思います。

いや、全部メモリに乗ることは知っていましたし、ResultSetTypeやFetchSizeを指定すればいいのも知っていましたが、その指定をした場合にResultSetの全部の行を読まないといけないことはよく理解していなかったので…。

これを機に、自分でも見てみたいと思います!

なお、利用するMySQLバージョンは以下とし
※5.7.18で書き直しました

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 5.7.18-log MySQL Community Server (GPL)

Connector/Jのバージョンは以下とします。

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.41</version>
        </dependency>

確認すること

確認することとしては、以下とします。

  • 大量のデータ+行があるテーブルAを作成する
  • テーブルAのデータは、サンプルとして動かすJavaアプリケーションのヒープに入りきらない量とする
  • テーブルAのデータからSELECTしてレコードを1件ずつ取得しつつ、テーブルBにINSERTする
  • この時の挙動と実装の組み合わせを確認する

用意したテーブルは、以下の2つ。

mysql> DESC large_source;
+-------+---------+------+-----+---------+-------+
| Field | Type    | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| id    | int(11) | NO   | PRI | 0       |       |
| value | text    | YES  |     | NULL    |       |
+-------+---------+------+-----+---------+-------+
2 rows in set (0.18 sec)

mysql> DESC large_destination;
+-------+---------+------+-----+---------+-------+
| Field | Type    | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| id    | int(11) | NO   | PRI | 0       |       |
| value | text    | YES  |     | NULL    |       |
+-------+---------+------+-----+---------+-------+
2 rows in set (0.14 sec)

先ほどの確認で、テーブルAを「large_source」、テーブルBを「large_destination」とします。

すごい単純なテーブルですが、とりあえずvalueカラムに巨大なテキストを入れ、件数と合わせてサンプルとして作るアプリケーションのヒープを上回るようにします。

SQL1本で、SELECTしてINSERTするようなSQLを用意すればよいだろうという点は、ここでは考慮しません。実際に、バッチ処理とかでこうやって1件1件読み出していろいろやって、別のテーブルに保存や更新するなど、よくありそうな内容ですので。今回は思い切り単純化しているだけですが。

データ投入

データを投入するためのプログラムとして、以下のようなものを用意。
src/main/java/org/littlewings/mysql/CreateTableAndLoadData.java

package org.littlewings.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.mysql.jdbc.AbandonedConnectionCleanupThread;

public class CreateTableAndLoadData {
    public static void main(String... args) throws SQLException, InterruptedException {
        try (Connection conn =
                     DriverManager.getConnection("jdbc:mysql://localhost:3306/practice?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&useSSL=false",
                             "kazuhira",
                             "password");
             Statement statement = conn.createStatement();
             PreparedStatement ps = conn.prepareStatement("INSERT INTO large_source(id, value) VALUES(?, ?)")) {
            statement.execute("DROP TABLES IF EXISTS large_source, large_destination");

            statement.execute("CREATE TABLE large_source(id INT, value TEXT, PRIMARY KEY(id))");
            statement.execute("CREATE TABLE large_destination(id INT, value TEXT, PRIMARY KEY(id))");

            String largeValue = IntStream.rangeClosed(1, 10737).mapToObj(i -> "A").collect(Collectors.joining(""));
            int generateRows = 50000;
            int commitCount = 10000;
            int batchSize = 500;

            conn.setAutoCommit(false);

            IntStream.rangeClosed(1, generateRows).forEach(i -> {
                try {
                    ps.setInt(1, i);
                    ps.setString(2, largeValue);
                    ps.addBatch();

                    if (i % batchSize == 0) {
                        ps.executeBatch();
                    }

                    if (i % commitCount == 0 && i != generateRows) {
                        conn.commit();
                        System.out.printf("%d rows insert done.%n", i);
                    }
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            });

            ps.executeBatch();

            conn.commit();
            System.out.printf("%d rows insert done.%n", generateRows);
        } finally {
            AbandonedConnectionCleanupThread.checkedShutdown();
        }
    }
}

テーブルは、実行時に作成します。

            statement.execute("DROP TABLES IF EXISTS large_source, large_destination");

            statement.execute("CREATE TABLE large_source(id INT, value TEXT, PRIMARY KEY(id))");
            statement.execute("CREATE TABLE large_destination(id INT, value TEXT, PRIMARY KEY(id))");

また、大きなテキスト値と生成件数は、以下のように定義します。

            String largeValue = IntStream.rangeClosed(1, 10737).mapToObj(i -> "A").collect(Collectors.joining(""));
            int generateRows = 50000;

valueカラムに設定するのは全部「A」の10,737文字の文字列ですが、中身など見ないのでいいでしょう…。

これを動かすと、データが50,000件生成されます。

10000 rows insert done.
20000 rows insert done.
30000 rows insert done.
40000 rows insert done.
50000 rows insert done.

確認。

mysql> SELECT COUNT(1) FROM large_source;
+----------+
| COUNT(1) |
+----------+
|    50000 |
+----------+
1 row in set (0.11 sec)

当然、INSERT先は0件です。

mysql> SELECT COUNT(1) FROM large_destination;
+----------+
| COUNT(1) |
+----------+
|        0 |
+----------+
1 row in set (0.00 sec)

確認用プログラム

それでは、ここで用意した50,000件のデータをSELECTして、INSERTするプログラムを書いてみます。

まずは基本部分。
src/main/java/org/littlewings/mysql/SelectInsert.java

package org.littlewings.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import com.mysql.jdbc.AbandonedConnectionCleanupThread;

public class SelectInsert {
    public static void main(String... args) throws SQLException, InterruptedException {
        try (Connection conn =
                     DriverManager.getConnection("jdbc:mysql://localhost:3306/practice?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&useSSL=false",
                     // DriverManager.getConnection("jdbc:mysql://localhost:3306/practice?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&useSSL=false&useCursorFetch=true&defaultFetchSize=100",
                             "kazuhira",
                             "password")) {
            truncate(conn);

            String mode = args.length == 0 ? "simply" : args[0];

            switch (mode) {
                case "simply":
                    simply(conn);
                    break;
                case "cursor":
                    cursor(conn);
                    break;
                case "use-cursor":
                    useCursor(conn);
                    break;
                case "limitloop":
                    limitloop(conn);
                    break;
                default:
                    System.out.printf("unknown mode [%s].%n", mode);
                    System.exit(1);
            }
        } finally {
            AbandonedConnectionCleanupThread.checkedShutdown();
            // これを入れないと、裏でスレッドが残り続ける…
            // [WARNING] thread Thread[Abandoned connection cleanup thread,5,org.littlewings.mysql.SelectInsert] was interrupted but is still alive after waiting at least 15000msecs
            // [WARNING] thread Thread[Abandoned connection cleanup thread,5,org.littlewings.mysql.SelectInsert] will linger despite being asked to die via interruption
            // [WARNING] NOTE: 1 thread(s) did not finish despite being asked to  via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied.
            // [WARNING] Couldn't destroy threadgroup org.codehaus.mojo.exec.ExecJavaMojo$IsolatedThreadGroup[name=org.littlewings.mysql.SelectInsert,maxpri=10]
            // java.lang.IllegalThreadStateException
            // 	at java.lang.ThreadGroup.destroy(ThreadGroup.java:778)
        }
    }

    private static void truncate(Connection conn) throws SQLException {
        try (Statement statement = conn.createStatement()) {
            statement.execute("TRUNCATE TABLE large_destination");
        }
    }

    // 後で
}

開始時に、データ投入先のテーブルは1度TRUNCATEしておきます。

なお、先ほどのデータ投入プログラムでも書いていましたが、以下の1文がないとスレッドが残ります。
※以前はAbandonedConnectionCleanupThread#shutdownだったのですが、非推奨になりcheckedShutdownとなったようです

            AbandonedConnectionCleanupThread.checkedShutdown();

これは、

裏で勝手にスレッドが走っている -> そしてメモリリーク

http://saiya-moebius.hatenablog.com/entry/2014/08/20/230445

として紹介されていました。

それでは、「// 後で」と書いている部分で、SELECT⇒INSERTするプログラムを書いていきます。
ヒープ使用量をオーバーしやすいように、ヒープのサイズは絞っておきます。

-Xmx512m

プログラム実行時の起動引数で、「// 後で」と書いた部分を呼び出すように実装しています。

デフォルトの場合

まずは、Connnector/Jのデフォルトの状態で試してみます。

実装は、このように。

    private static void simply(Connection conn) throws SQLException {
        // デフォルトで全部メモリに乗るので、OutOfMemorErrorコース
        // Caused by: java.lang.OutOfMemoryError: Java heap space
        //	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3387)
        //	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3334)

        int counter = 0;
        int commitCount = 10000;

        conn.setAutoCommit(false);

        try (PreparedStatement selectStatement = conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC");
             PreparedStatement insertStatement = conn.prepareStatement("INSERT large_destination(id, value) VALUES(?, ?)");
             ResultSet resultSet = selectStatement.executeQuery()) {
            while (resultSet.next()) {
                counter++;

                insertStatement.setInt(1, resultSet.getInt(1));
                insertStatement.setString(2, resultSet.getString(2));
                insertStatement.execute();

                insertStatement.clearParameters();

                if (counter % commitCount == 0) {
                    conn.commit();
                    System.out.printf("fetched and committed %d rows.%n", counter);
                }
            }
        }
    }

バッチ更新を行うようなパフォーマンス改善はいったん置いておいて、シンプルにSELECTの結果をINSERTに使おうとしています。

これを実行すると、コメントにも書いてありますがOutOfMemoryErrorとなります。

Caused by: java.lang.OutOfMemoryError: Java heap space
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3512)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3459)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3900)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:873)
	at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1996)
	at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
	at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
	at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
	at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
	at com.mysql.jdbc.ServerPreparedStatement.serverExecute(ServerPreparedStatement.java:1366)
	at com.mysql.jdbc.ServerPreparedStatement.executeInternal(ServerPreparedStatement.java:782)
	at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
	at org.littlewings.mysql.SelectInsert.simply(SelectInsert.java:71)
	at org.littlewings.mysql.SelectInsert.main(SelectInsert.java:25)
	... 6 more

PreparedStatement#executeQueryの時点で、OutOfMemoryとなるようです。

GC overhead limit exceeded」となることもありました。

Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

まあ、ヒープが足りないというか、結果を全部クライアント側に持ってきてしまっているというわけですね。

一応、10,000行ずつコミットおよびログ出力するようなプログラムとしているのですが、

                if (counter % commitCount == 0) {
                    conn.commit();
                    System.out.printf("fetched and committed %d rows.%n", counter);
                }

何も出力されません。

ResultSetTypeやFetchSizeを指定する

最初に紹介したブログエントリや、マニュアルによるとResultSetTypeやFetchSizeを指定すると、全件メモリに乗せずとも済むようです。

MySQL :: MySQL Connector/J 8.0 Developer Guide :: 6.4 JDBC API Implementation Notes

すると、このような実装になります。

    private static void cursor(Connection conn) throws SQLException {
        // ResultSetを開いたまま、次のクエリを投げられない
        // Caused by: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@2f586ff8 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.

        int counter = 0;
        int commitCount = 10000;

        conn.setAutoCommit(false);

        try (PreparedStatement selectStatement =
                     conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC", ResultSet.TYPE_FORWARD_ONLY,
                             ResultSet.CONCUR_READ_ONLY);
             PreparedStatement insertStatement = conn.prepareStatement("INSERT large_destination(id, value) VALUES(?, ?)")) {
            selectStatement.setFetchSize(Integer.MIN_VALUE);

            try (ResultSet resultSet = selectStatement.executeQuery()) {
                while (resultSet.next()) {
                    counter++;

                    insertStatement.setInt(1, resultSet.getInt(1));
                    insertStatement.setString(2, resultSet.getString(2));
                    insertStatement.execute();

                    insertStatement.clearParameters();

                    if (counter % commitCount == 0) {
                        conn.commit();
                        System.out.printf("fetched and committed %d rows.%n", counter);
                    }
                }

                conn.commit();
                System.out.printf("fetched and committed %d rows.%n", counter);
            }
        }
    }

Statement(もしくはPreparedStatement)生成時に、以下のようにResultSetのフィールド値を指定します。

        try (PreparedStatement selectStatement =
                     conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC", ResultSet.TYPE_FORWARD_ONLY,
                             ResultSet.CONCUR_READ_ONLY);

そして、フェッチサイズも調整しておきます。「Integer.MIN_VALUE」であることが重要です。これ以外の値を設定すると、元の全部
メモリに載せようとしてしまいます…。「Integer.MIN_VALUE」が設定のトリガーになっているのだそうで。

           selectStatement.setFetchSize(Integer.MIN_VALUE);

これでうまくいきそうですね?では、実行してみます。結果。

Caused by: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@1561548b is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:868)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:864)
	at com.mysql.jdbc.MysqlIO.checkForOutstandingStreamingData(MysqlIO.java:3214)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2450)
	at com.mysql.jdbc.ServerPreparedStatement.serverExecute(ServerPreparedStatement.java:1281)
	at com.mysql.jdbc.ServerPreparedStatement.executeInternal(ServerPreparedStatement.java:782)
	at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1197)
	at org.littlewings.mysql.SelectInsert.cursor(SelectInsert.java:113)
	at org.littlewings.mysql.SelectInsert.main(SelectInsert.java:28)
	... 6 more

ここでは、PreparedStatement#executeでエラーになります。もちろん、データの更新もできていません。

これは、Connector/Jのドキュメントに書かれてある、以下の内容が原因です。

There are some caveats with this approach. You must read all of the rows in the result set (or close it) before you can issue any other queries on the connection, or an exception will be thrown.

http://dev.mysql.com/doc/connector-j/en/connector-j-reference-implementation-notes.html

つまり、全部の行を読み切ってしまうか、ResultSetを閉じない限り、同じConnnection上でクエリを発行できません、と。

これは厳しいですね…。今回のサンプルのように、大量データをSELECTしてINSERTするようなプログラムではこの方法は使えないということを意味します。
※単純に読み取りだけであれば、この方法も有効ではあるのですが

コネクションを別々にしろとでも…。

解法1(useCursorFetchとフェッチサイズを指定する)

解法のひとつとしては、JDBCの接続プロパティのuseCursorFetchをtrueに指定するとともに、フェッチサイズを0より大きくします。

DriverManager.getConnection("jdbc:mysql://localhost:3306/practice?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&useSSL=false&useCursorFetch=true&defaultFetchSize=100",

If connected to MySQL > 5.0.2, and setFetchSize() > 0 on a statement, should that statement use cursor-based fetching to retrieve rows?

Default: false

Since version: 5.0.0

https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html

ここと

useCursorFetch=true

あと、フェッチサイズはデフォルト100にしてみました。
※Statement#setFetchSizeで指定してもいいと思いますが…

defaultFetchSize=100

コードは、単純にこんな感じに。

    private static void useCursor(Connection conn) throws SQLException {
        int counter = 0;
        int commitCount = 10000;

        conn.setAutoCommit(false);

        try (PreparedStatement selectStatement =
                     conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC");
             PreparedStatement insertStatement = conn.prepareStatement("INSERT large_destination(id, value) VALUES(?, ?)")) {

            try (ResultSet resultSet = selectStatement.executeQuery()) {
                while (resultSet.next()) {
                    counter++;

                    insertStatement.setInt(1, resultSet.getInt(1));
                    insertStatement.setString(2, resultSet.getString(2));
                    insertStatement.execute();

                    insertStatement.clearParameters();

                    if (counter % commitCount == 0) {
                        conn.commit();
                        System.out.printf("fetched and committed %d rows.%n", counter);
                    }
                }

                conn.commit();
                System.out.printf("fetched and committed %d rows.%n", counter);
            }
        }
    }

これなら、SELECT文を投げつつINSERT(別のクエリ)も実行できるようです。実行時の情報は、こちら。

fetched and committed 10000 rows.
fetched and committed 20000 rows.
fetched and committed 30000 rows.
fetched and committed 40000 rows.
fetched and committed 50000 rows.
fetched and committed 50000 rows.

テーブルの状態。

mysql> SELECT COUNT(1) FROM large_destination;
+----------+
| COUNT(1) |
+----------+
|    50000 |
+----------+
1 row in set (0.11 sec)

mysql> SELECT MIN(id), MAX(id) FROM large_destination;
+---------+---------+
| MIN(id) | MAX(id) |
+---------+---------+
|       1 |   50000 |
+---------+---------+
1 row in set (0.00 sec)

しかし、これがうまくいくのに、どうしてこちらの方法だとうまくいかないんでしょうねぇ…。

        try (PreparedStatement selectStatement =
                     conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC", ResultSet.TYPE_FORWARD_ONLY,
                             ResultSet.CONCUR_READ_ONLY);
           selectStatement.setFetchSize(Integer.MIN_VALUE);
解法2(offset/limitを使う)

もうひとつの解法としては、LIMIT句のoffset/limitを使って、MySQLサーバーから取得する行の数をある程度の件数で区分けして取得すること。

こんな感じで。

    private static void limitloop(Connection conn) throws SQLException {
        int queryCounter = 0;
        int limit = 10000;

        conn.setAutoCommit(false);

        try (PreparedStatement selectStatement = conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC LIMIT ?, ?");
             PreparedStatement insertStatement = conn.prepareStatement("INSERT large_destination(id, value) VALUES(?, ?)")) {
            while (true) {
                int offset = queryCounter * limit;
                System.out.printf("offset %d, limit %d.%n", offset, limit);
                selectStatement.setInt(1, offset);
                selectStatement.setInt(2, limit);

                int rowCounter = 0;
                try (ResultSet resultSet = selectStatement.executeQuery()) {
                    queryCounter++;
                    while (resultSet.next()) {
                        rowCounter++;

                        insertStatement.setInt(1, resultSet.getInt(1));
                        insertStatement.setString(2, resultSet.getString(2));
                        insertStatement.execute();

                        insertStatement.clearParameters();
                    }
                }

                selectStatement.clearParameters();
                conn.commit();

                if (rowCounter == 0) {
                    break;
                } else {
                    System.out.printf("fetched and committed %d rows.%n", ((queryCounter - 1) * limit) + rowCounter);
                }
            }
        }
    }

これであれば、エラーにもならずデータをSELECT⇒INSERTすることが可能です。

実行時の情報。

offset 0, limit 10000.
fetched and committed 10000 rows.
offset 10000, limit 10000.
fetched and committed 20000 rows.
offset 20000, limit 10000.
fetched and committed 30000 rows.
offset 30000, limit 10000.
fetched and committed 40000 rows.
offset 40000, limit 10000.
fetched and committed 50000 rows.
offset 50000, limit 10000.
fetched and committed 50000 rows.

結果は、useCursorFetchを使用した場合と一緒なので割愛。

欠点としてはクエリ発行回数が増えますし、1回のクエリで扱うデータの範囲が変わってしまいます…。プログラムもちょっと書きにくくなります。

とはいえ、扱うデータの件数が多い場合はこのような実装を行うかどうかの検討が必要でしょう。

まとめ

というわけで、MySQL+Connector/Jで大量のデータをSELECTして1件1件扱いつつ、INSERTしていく例とその時にぶつかる問題について書いてみました。

割とよく踏みそうな問題な気がしますが、みなさんどうしているんでしょうね?

とりあえず、改めて確認できましたよ、と。

今回作成したプログラム全体については、以下に貼っておきます。

テーブル&データ作成。
src/main/java/org/littlewings/mysql/CreateTableAndLoadData.java

package org.littlewings.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.mysql.jdbc.AbandonedConnectionCleanupThread;

public class CreateTableAndLoadData {
    public static void main(String... args) throws SQLException, InterruptedException {
        try (Connection conn =
                     DriverManager.getConnection("jdbc:mysql://localhost:3306/practice?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&useSSL=false",
                             "kazuhira",
                             "password");
             Statement statement = conn.createStatement();
             PreparedStatement ps = conn.prepareStatement("INSERT INTO large_source(id, value) VALUES(?, ?)")) {
            statement.execute("DROP TABLES IF EXISTS large_source, large_destination");

            statement.execute("CREATE TABLE large_source(id INT, value TEXT, PRIMARY KEY(id))");
            statement.execute("CREATE TABLE large_destination(id INT, value TEXT, PRIMARY KEY(id))");

            String largeValue = IntStream.rangeClosed(1, 10737).mapToObj(i -> "A").collect(Collectors.joining(""));
            int generateRows = 50000;
            int commitCount = 10000;
            int batchSize = 500;

            conn.setAutoCommit(false);

            IntStream.rangeClosed(1, generateRows).forEach(i -> {
                try {
                    ps.setInt(1, i);
                    ps.setString(2, largeValue);
                    ps.addBatch();

                    if (i % batchSize == 0) {
                        ps.executeBatch();
                    }

                    if (i % commitCount == 0 && i != generateRows) {
                        conn.commit();
                        System.out.printf("%d rows insert done.%n", i);
                    }
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            });

            ps.executeBatch();

            conn.commit();
            System.out.printf("%d rows insert done.%n", generateRows);
        } finally {
            AbandonedConnectionCleanupThread.checkedShutdown();
        }
    }
}

SELECT⇒INSERTを行うプログラム。
src/main/java/org/littlewings/mysql/SelectInsert.java

package org.littlewings.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import com.mysql.jdbc.AbandonedConnectionCleanupThread;

public class SelectInsert {
    public static void main(String... args) throws SQLException, InterruptedException {
        try (Connection conn =
                     DriverManager.getConnection("jdbc:mysql://localhost:3306/practice?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&useSSL=false",
                     //DriverManager.getConnection("jdbc:mysql://localhost:3306/practice?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&useSSL=false&useCursorFetch=true&defaultFetchSize=100",
                             "kazuhira",
                             "password")) {
            truncate(conn);

            String mode = args.length == 0 ? "simply" : args[0];

            switch (mode) {
                case "simply":
                    simply(conn);
                    break;
                case "cursor":
                    cursor(conn);
                    break;
                case "use-cursor":
                    useCursor(conn);
                    break;
                case "limitloop":
                    limitloop(conn);
                    break;
                default:
                    System.out.printf("unknown mode [%s].%n", mode);
                    System.exit(1);
            }
        } finally {
            AbandonedConnectionCleanupThread.checkedShutdown();
            // これを入れないと、裏でスレッドが残り続ける…
            // [WARNING] thread Thread[Abandoned connection cleanup thread,5,org.littlewings.mysql.SelectInsert] was interrupted but is still alive after waiting at least 15000msecs
            // [WARNING] thread Thread[Abandoned connection cleanup thread,5,org.littlewings.mysql.SelectInsert] will linger despite being asked to die via interruption
            // [WARNING] NOTE: 1 thread(s) did not finish despite being asked to  via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied.
            // [WARNING] Couldn't destroy threadgroup org.codehaus.mojo.exec.ExecJavaMojo$IsolatedThreadGroup[name=org.littlewings.mysql.SelectInsert,maxpri=10]
            // java.lang.IllegalThreadStateException
            // 	at java.lang.ThreadGroup.destroy(ThreadGroup.java:778)
        }
    }

    private static void truncate(Connection conn) throws SQLException {
        try (Statement statement = conn.createStatement()) {
            statement.execute("TRUNCATE TABLE large_destination");
        }
    }

    private static void simply(Connection conn) throws SQLException {
        // デフォルトで全部メモリに乗るので、OutOfMemorErrorコース
        // Caused by: java.lang.OutOfMemoryError: Java heap space
        //	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3387)
        //	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3334)

        int counter = 0;
        int commitCount = 10000;

        conn.setAutoCommit(false);

        try (PreparedStatement selectStatement = conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC");
             PreparedStatement insertStatement = conn.prepareStatement("INSERT large_destination(id, value) VALUES(?, ?)");
             ResultSet resultSet = selectStatement.executeQuery()) {
            while (resultSet.next()) {
                counter++;

                insertStatement.setInt(1, resultSet.getInt(1));
                insertStatement.setString(2, resultSet.getString(2));
                insertStatement.execute();

                insertStatement.clearParameters();

                if (counter % commitCount == 0) {
                    conn.commit();
                    System.out.printf("fetched and committed %d rows.%n", counter);
                }
            }

            conn.commit();
            System.out.printf("fetched and committed %d rows.%n", counter);
        }
    }

    private static void cursor(Connection conn) throws SQLException {
        // ResultSetを開いたまま、次のクエリを投げられない
        // Caused by: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@2f586ff8 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.

        int counter = 0;
        int commitCount = 10000;

        conn.setAutoCommit(false);

        try (PreparedStatement selectStatement =
                     conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC", ResultSet.TYPE_FORWARD_ONLY,
                             ResultSet.CONCUR_READ_ONLY);
             PreparedStatement insertStatement = conn.prepareStatement("INSERT large_destination(id, value) VALUES(?, ?)")) {
            selectStatement.setFetchSize(Integer.MIN_VALUE);

            try (ResultSet resultSet = selectStatement.executeQuery()) {
                while (resultSet.next()) {
                    counter++;

                    insertStatement.setInt(1, resultSet.getInt(1));
                    insertStatement.setString(2, resultSet.getString(2));
                    insertStatement.execute();

                    insertStatement.clearParameters();

                    if (counter % commitCount == 0) {
                        conn.commit();
                        System.out.printf("fetched and committed %d rows.%n", counter);
                    }
                }

                conn.commit();
                System.out.printf("fetched and committed %d rows.%n", counter);
            }
        }
    }

    private static void useCursor(Connection conn) throws SQLException {
        int counter = 0;
        int commitCount = 10000;

        conn.setAutoCommit(false);

        try (PreparedStatement selectStatement =
                     conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC");
             PreparedStatement insertStatement = conn.prepareStatement("INSERT large_destination(id, value) VALUES(?, ?)")) {

            try (ResultSet resultSet = selectStatement.executeQuery()) {
                while (resultSet.next()) {
                    counter++;

                    insertStatement.setInt(1, resultSet.getInt(1));
                    insertStatement.setString(2, resultSet.getString(2));
                    insertStatement.execute();

                    insertStatement.clearParameters();

                    if (counter % commitCount == 0) {
                        conn.commit();
                        System.out.printf("fetched and committed %d rows.%n", counter);
                    }
                }

                conn.commit();
                System.out.printf("fetched and committed %d rows.%n", counter);
            }
        }
    }


    private static void limitloop(Connection conn) throws SQLException {
        int queryCounter = 0;
        int limit = 10000;

        conn.setAutoCommit(false);

        try (PreparedStatement selectStatement = conn.prepareStatement("SELECT id, value FROM large_source ORDER BY id ASC LIMIT ?, ?");
             PreparedStatement insertStatement = conn.prepareStatement("INSERT large_destination(id, value) VALUES(?, ?)")) {
            while (true) {
                int offset = queryCounter * limit;
                System.out.printf("offset %d, limit %d.%n", offset, limit);
                selectStatement.setInt(1, offset);
                selectStatement.setInt(2, limit);

                int rowCounter = 0;
                try (ResultSet resultSet = selectStatement.executeQuery()) {
                    queryCounter++;
                    while (resultSet.next()) {
                        rowCounter++;

                        insertStatement.setInt(1, resultSet.getInt(1));
                        insertStatement.setString(2, resultSet.getString(2));
                        insertStatement.execute();

                        insertStatement.clearParameters();
                    }
                }

                selectStatement.clearParameters();
                conn.commit();

                if (rowCounter == 0) {
                    break;
                } else {
                    System.out.printf("fetched and committed %d rows.%n", ((queryCounter - 1) * limit) + rowCounter);
                }
            }
        }
    }
}