これは、なにをしたくて書いたもの?
Spring FrameworkのJDBC接続(Connection
)は、スレッドに紐付けられて管理されていたはずなので。
ここで、タスク実行の仕組みを使い、別スレッド内でデータベースアクセスを行った場合はConnection
が別になるのかなと。
そんな気はしますが、試してみよう、ということで。
SpringにおけるJDBC接続とスレッド
DataSourceTransactionManager
の説明に、記述があります。
The DataSourceTransactionManager class is a PlatformTransactionManager implementation for single JDBC datasources. It binds a JDBC connection from the specified data source to the currently executing thread, potentially allowing for one thread connection per data source.
Using DataSourceTransactionManager
やっぱり、スレッドにJDBC接続を紐付けるようですね。
お題
今回は、こんな内容でやってみます。
@Transactional
を使い、データベースアクセスするBeanを作成する- 作成したBeanを、
TaskExecutor
で実行したCallable
内でApplicationContext
からルックアップする(※) Callable
の実行は複数回行い、いずれも別スレッドで動作させる- 一部の処理は、ロールバックさせる
※なお、別スレッド内でBeanを取得するのは必須な話ではありません
TaskExecutor
とは、Springにおけるタスク実行の仕組みですね。スレッドプールが使えます。
この時、MySQLから見て複数の接続があるか、そしてコミットされるトランザクションとロールバックされるトランザクションに
分かれるか、というのを見てみます。
環境
今回の環境は、こちらです。
$ java --version openjdk 11.0.10 2021-01-19 OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.20.04) OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.20.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.10, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"
また、利用するデータベースはMySQL 8.0.24とし、172.17.0.2のサーバーで動作しているものとします。
テーブルは、こんな定義で用意。
create table sample( counter int, primary key(counter) );
プロジェクトを作成する
Spring Initializrで、Spring Bootプロジェクトを作成します。
$ curl -s https://start.spring.io/starter.tgz \ -d bootVersion=2.4.5 \ -d javaVersion=11 \ -d name=thread-and-jdbc-connection \ -d groupId=org.littlewings \ -d artifactId=thread-and-jdbc-connection \ -d version=0.0.1-SNAPSHOT \ -d packageName=org.littlewings.spring.jdbc \ -d dependencies=jdbc,mysql \ -d baseDir=thread-and-jdbc-connection | tar zxvf - $ cd thread-and-jdbc-connection
依存関係には、jdbc
とmysql
を含めました。
生成されたpom.xml
の依存関係など。
<properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
ソースコード
それでは、ソースコードを書いていきます。
src/main/java/org/littlewings/spring/jdbc/QueryExecutor.java
package org.littlewings.spring.jdbc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @Component public class QueryExecutor { Logger logger = LoggerFactory.getLogger(QueryExecutor.class); JdbcTemplate jdbcTemplate; public QueryExecutor(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Transactional public void execute(int counter) { logger.info("runner start[{}]", counter); // Thread.dumpStack(); logger.info("sleeping[{}]...", counter); int v = jdbcTemplate.queryForObject("select sleep(10)", Integer.class); logger.info("return[{}] = {}", counter, v); int inserted = jdbcTemplate.update("insert into sample(counter) values(?)", counter); logger.info("inserted[{}] = {}", counter, inserted); if (counter % 2 == 0) { logger.info("rollback[{}]", counter); throw new RuntimeException("Oops!![" + counter + "]"); } logger.info("runner end"); } }
データベースアクセスには、Spring JDBCを使いました。
メソッドには@Transactional
を使い、
@Transactional public void execute(int counter) {
接続状態の確認がしやすいようにスリープを入れつつ
logger.info("sleeping[{}]...", counter); int v = jdbcTemplate.queryForObject("select sleep(10)", Integer.class); logger.info("return[{}] = {}", counter, v);
データ登録。
int inserted = jdbcTemplate.update("insert into sample(counter) values(?)", counter); logger.info("inserted[{}] = {}", counter, inserted);
渡される引数によっては、ロールバックされるように例外を投げておきます。
if (counter % 2 == 0) { logger.info("rollback[{}]", counter); throw new RuntimeException("Oops!![" + counter + "]"); }
呼び出し側、かつmainクラス。
src/main/java/org/littlewings/spring/jdbc/App.java
package org.littlewings.spring.jdbc; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @SpringBootApplication public class App implements CommandLineRunner { Logger logger = LoggerFactory.getLogger(App.class); ApplicationContext ctx; ThreadPoolTaskExecutor taskExecutor; public static void main(String[] args) { SpringApplication.run(App.class, args); } public App(ApplicationContext ctx, ThreadPoolTaskExecutor taskExecutor) { this.ctx = ctx; this.taskExecutor = taskExecutor; } @Override public void run(String... args) throws Exception { List<Future<?>> futures = IntStream .rangeClosed(1, 10) .mapToObj(i -> taskExecutor.submit(() -> { QueryExecutor runner = ctx.getBean(QueryExecutor.class); runner.execute(i); })) .collect(Collectors.toList()); for (Future<?> future : futures) { try { future.get(); } catch (ExecutionException | InterruptedException e) { logger.info("exception[{}], message = {}", e.getClass(), e.getMessage()); } } } }
TaskExecutor
…というかThreadPoolTaskExecutor
を使い、10回ルールしてタスクを実行。この時に、別スレッド内で
ApplicationContext
から先ほど作成したBeanを取得して実行しています。
※後述しますが、Beanの取得を別スレッドで行うのは必須ではありません
@Override public void run(String... args) throws Exception { List<Future<?>> futures = IntStream .rangeClosed(1, 10) .mapToObj(i -> taskExecutor.submit(() -> { QueryExecutor runner = ctx.getBean(QueryExecutor.class); runner.execute(i); })) .collect(Collectors.toList()); for (Future<?> future : futures) { try { future.get(); } catch (ExecutionException | InterruptedException e) { logger.info("exception[{}], message = {}", e.getClass(), e.getMessage()); } } }
設定は、こちら。タスク数を10にしたので、スレッドプールのサイズとDataSource
のプールサイズを少し大きくして
おきました。
src/main/resources/application.properties
spring.task.execution.pool.core-size=16 spring.task.execution.pool.keep-alive=3s spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice spring.datasource.username=kazuhira spring.datasource.password=password spring.datasource.hikari.maximum-pool-size=16
確認してみる
では、確認してみましょう。
パッケージングして
$ mvn package
実行。
$ java -jar target/thread-and-jdbc-connection-0.0.1-SNAPSHOT.jar
10個のスレッドが動作しつつ途中で止まるので
2021-04-27 23:49:38.640 INFO 19548 --- [ task-2] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting... 2021-04-27 23:49:39.150 INFO 19548 --- [ task-2] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed. 2021-04-27 23:49:39.194 INFO 19548 --- [ task-1] o.littlewings.spring.jdbc.QueryExecutor : runner start[1] 2021-04-27 23:49:39.194 INFO 19548 --- [ task-1] o.littlewings.spring.jdbc.QueryExecutor : sleeping[1]... 2021-04-27 23:49:39.197 INFO 19548 --- [ task-9] o.littlewings.spring.jdbc.QueryExecutor : runner start[9] 2021-04-27 23:49:39.197 INFO 19548 --- [ task-9] o.littlewings.spring.jdbc.QueryExecutor : sleeping[9]... 2021-04-27 23:49:39.239 INFO 19548 --- [ task-4] o.littlewings.spring.jdbc.QueryExecutor : runner start[4] 2021-04-27 23:49:39.239 INFO 19548 --- [ task-4] o.littlewings.spring.jdbc.QueryExecutor : sleeping[4]... 2021-04-27 23:49:39.280 INFO 19548 --- [ task-5] o.littlewings.spring.jdbc.QueryExecutor : runner start[5] 2021-04-27 23:49:39.281 INFO 19548 --- [ task-5] o.littlewings.spring.jdbc.QueryExecutor : sleeping[5]... 2021-04-27 23:49:39.316 INFO 19548 --- [ task-6] o.littlewings.spring.jdbc.QueryExecutor : runner start[6] 2021-04-27 23:49:39.317 INFO 19548 --- [ task-6] o.littlewings.spring.jdbc.QueryExecutor : sleeping[6]... 2021-04-27 23:49:39.352 INFO 19548 --- [ task-2] o.littlewings.spring.jdbc.QueryExecutor : runner start[2] 2021-04-27 23:49:39.352 INFO 19548 --- [ task-2] o.littlewings.spring.jdbc.QueryExecutor : sleeping[2]... 2021-04-27 23:49:39.384 INFO 19548 --- [ task-10] o.littlewings.spring.jdbc.QueryExecutor : runner start[10] 2021-04-27 23:49:39.385 INFO 19548 --- [ task-10] o.littlewings.spring.jdbc.QueryExecutor : sleeping[10]... 2021-04-27 23:49:39.418 INFO 19548 --- [ task-8] o.littlewings.spring.jdbc.QueryExecutor : runner start[8] 2021-04-27 23:49:39.418 INFO 19548 --- [ task-8] o.littlewings.spring.jdbc.QueryExecutor : sleeping[8]... 2021-04-27 23:49:39.448 INFO 19548 --- [ task-3] o.littlewings.spring.jdbc.QueryExecutor : runner start[3] 2021-04-27 23:49:39.448 INFO 19548 --- [ task-3] o.littlewings.spring.jdbc.QueryExecutor : sleeping[3]... 2021-04-27 23:49:39.489 INFO 19548 --- [ task-7] o.littlewings.spring.jdbc.QueryExecutor : runner start[7] 2021-04-27 23:49:39.489 INFO 19548 --- [ task-7] o.littlewings.spring.jdbc.QueryExecutor : sleeping[7]...
この間に、MySQL側を見てみます。
mysql> show full processlist; +----+----------+------------------+----------+---------+------+------------+-----------------------+ | Id | User | Host | db | Command | Time | State | Info | +----+----------+------------------+----------+---------+------+------------+-----------------------+ | 10 | kazuhira | localhost | practice | Query | 0 | init | show full processlist | | 19 | kazuhira | 172.17.0.1:51530 | practice | Query | 2 | User sleep | select sleep(10) | | 20 | kazuhira | 172.17.0.1:51532 | practice | Query | 2 | User sleep | select sleep(10) | | 21 | kazuhira | 172.17.0.1:51534 | practice | Query | 2 | User sleep | select sleep(10) | | 22 | kazuhira | 172.17.0.1:51536 | practice | Query | 2 | User sleep | select sleep(10) | | 23 | kazuhira | 172.17.0.1:51538 | practice | Query | 2 | User sleep | select sleep(10) | | 24 | kazuhira | 172.17.0.1:51540 | practice | Query | 2 | User sleep | select sleep(10) | | 25 | kazuhira | 172.17.0.1:51542 | practice | Query | 2 | User sleep | select sleep(10) | | 26 | kazuhira | 172.17.0.1:51544 | practice | Query | 2 | User sleep | select sleep(10) | | 27 | kazuhira | 172.17.0.1:51546 | practice | Query | 2 | User sleep | select sleep(10) | | 28 | kazuhira | 172.17.0.1:51548 | practice | Query | 2 | User sleep | select sleep(10) | | 29 | kazuhira | 172.17.0.1:51550 | practice | Sleep | 2 | | NULL | | 30 | kazuhira | 172.17.0.1:51552 | practice | Sleep | 2 | | NULL | | 31 | kazuhira | 172.17.0.1:51554 | practice | Sleep | 2 | | NULL | | 32 | kazuhira | 172.17.0.1:51556 | practice | Sleep | 2 | | NULL | | 33 | kazuhira | 172.17.0.1:51558 | practice | Sleep | 2 | | NULL | | 34 | kazuhira | 172.17.0.1:51560 | practice | Sleep | 2 | | NULL | +----+----------+------------------+----------+---------+------+------------+-----------------------+ 17 rows in set (0.00 sec)
10個の接続があり、クエリを実行しているのが確認できます。
というわけで、スレッドごとに接続が確立されていることが確認できました。
しばらく待っていると、終了します。
2021-04-27 23:49:49.247 INFO 19548 --- [ task-1] o.littlewings.spring.jdbc.QueryExecutor : return[1] = 0 2021-04-27 23:49:49.247 INFO 19548 --- [ task-4] o.littlewings.spring.jdbc.QueryExecutor : return[4] = 0 2021-04-27 23:49:49.247 INFO 19548 --- [ task-9] o.littlewings.spring.jdbc.QueryExecutor : return[9] = 0 2021-04-27 23:49:49.268 INFO 19548 --- [ task-1] o.littlewings.spring.jdbc.QueryExecutor : inserted[1] = 1 2021-04-27 23:49:49.268 INFO 19548 --- [ task-4] o.littlewings.spring.jdbc.QueryExecutor : inserted[4] = 1 2021-04-27 23:49:49.269 INFO 19548 --- [ task-4] o.littlewings.spring.jdbc.QueryExecutor : rollback[4] 2021-04-27 23:49:49.269 INFO 19548 --- [ task-1] o.littlewings.spring.jdbc.QueryExecutor : runner end 2021-04-27 23:49:49.270 INFO 19548 --- [ task-9] o.littlewings.spring.jdbc.QueryExecutor : inserted[9] = 1 2021-04-27 23:49:49.270 INFO 19548 --- [ task-9] o.littlewings.spring.jdbc.QueryExecutor : runner end 2021-04-27 23:49:49.282 INFO 19548 --- [ task-5] o.littlewings.spring.jdbc.QueryExecutor : return[5] = 0 2021-04-27 23:49:49.284 INFO 19548 --- [ task-5] o.littlewings.spring.jdbc.QueryExecutor : inserted[5] = 1 2021-04-27 23:49:49.284 INFO 19548 --- [ task-5] o.littlewings.spring.jdbc.QueryExecutor : runner end 2021-04-27 23:49:49.318 INFO 19548 --- [ task-6] o.littlewings.spring.jdbc.QueryExecutor : return[6] = 0 2021-04-27 23:49:49.320 INFO 19548 --- [ task-6] o.littlewings.spring.jdbc.QueryExecutor : inserted[6] = 1 2021-04-27 23:49:49.320 INFO 19548 --- [ task-6] o.littlewings.spring.jdbc.QueryExecutor : rollback[6] 2021-04-27 23:49:49.354 INFO 19548 --- [ task-2] o.littlewings.spring.jdbc.QueryExecutor : return[2] = 0 2021-04-27 23:49:49.355 INFO 19548 --- [ task-2] o.littlewings.spring.jdbc.QueryExecutor : inserted[2] = 1 2021-04-27 23:49:49.355 INFO 19548 --- [ task-2] o.littlewings.spring.jdbc.QueryExecutor : rollback[2] 2021-04-27 23:49:49.385 INFO 19548 --- [ main] org.littlewings.spring.jdbc.App : exception[class java.util.concurrent.ExecutionException], message = java.lang.RuntimeException: Oops!![2] 2021-04-27 23:49:49.386 INFO 19548 --- [ task-10] o.littlewings.spring.jdbc.QueryExecutor : return[10] = 0 2021-04-27 23:49:49.388 INFO 19548 --- [ task-10] o.littlewings.spring.jdbc.QueryExecutor : inserted[10] = 1 2021-04-27 23:49:49.388 INFO 19548 --- [ task-10] o.littlewings.spring.jdbc.QueryExecutor : rollback[10] 2021-04-27 23:49:49.419 INFO 19548 --- [ task-8] o.littlewings.spring.jdbc.QueryExecutor : return[8] = 0 2021-04-27 23:49:49.421 INFO 19548 --- [ task-8] o.littlewings.spring.jdbc.QueryExecutor : inserted[8] = 1 2021-04-27 23:49:49.421 INFO 19548 --- [ task-8] o.littlewings.spring.jdbc.QueryExecutor : rollback[8] 2021-04-27 23:49:49.450 INFO 19548 --- [ task-3] o.littlewings.spring.jdbc.QueryExecutor : return[3] = 0 2021-04-27 23:49:49.451 INFO 19548 --- [ task-3] o.littlewings.spring.jdbc.QueryExecutor : inserted[3] = 1 2021-04-27 23:49:49.451 INFO 19548 --- [ task-3] o.littlewings.spring.jdbc.QueryExecutor : runner end 2021-04-27 23:49:49.470 INFO 19548 --- [ main] org.littlewings.spring.jdbc.App : exception[class java.util.concurrent.ExecutionException], message = java.lang.RuntimeException: Oops!![4] 2021-04-27 23:49:49.471 INFO 19548 --- [ main] org.littlewings.spring.jdbc.App : exception[class java.util.concurrent.ExecutionException], message = java.lang.RuntimeException: Oops!![6] 2021-04-27 23:49:49.490 INFO 19548 --- [ task-7] o.littlewings.spring.jdbc.QueryExecutor : return[7] = 0 2021-04-27 23:49:49.492 INFO 19548 --- [ task-7] o.littlewings.spring.jdbc.QueryExecutor : inserted[7] = 1 2021-04-27 23:49:49.492 INFO 19548 --- [ task-7] o.littlewings.spring.jdbc.QueryExecutor : runner end 2021-04-27 23:49:49.510 INFO 19548 --- [ main] org.littlewings.spring.jdbc.App : exception[class java.util.concurrent.ExecutionException], message = java.lang.RuntimeException: Oops!![8] 2021-04-27 23:49:49.511 INFO 19548 --- [ main] org.littlewings.spring.jdbc.App : exception[class java.util.concurrent.ExecutionException], message = java.lang.RuntimeException: Oops!![10] 2021-04-27 23:49:52.513 INFO 19548 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated... 2021-04-27 23:49:52.533 INFO 19548 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
データを見てみましょう。
mysql> select * from sample; +---------+ | counter | +---------+ | 1 | | 3 | | 5 | | 7 | | 9 | +---------+ 5 rows in set (0.00 sec)
例外を投げなかったものについては値が入り、例外を投げたものについては値が入っていない(=ロールバックされた)結果に
なりました。
というわけで、トランザクションも別々に管理できていることが確認できました、と。
あと、試しに@Transactional
なメソッドの実行中にスタックトレースも見てみましょう。
@Transactional public void execute(int counter) { logger.info("runner start[{}]", counter); Thread.dumpStack();
ちゃんと、トランザクション関係のインターセプターが入っていますね。
java.lang.Exception: Stack trace at java.base/java.lang.Thread.dumpStack(Thread.java:1388) at org.littlewings.spring.jdbc.QueryExecutor.execute(QueryExecutor.java:23) at org.littlewings.spring.jdbc.QueryExecutor$$FastClassBySpringCGLIB$$4f46783.invoke(<generated>) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) at org.littlewings.spring.jdbc.QueryExecutor$$EnhancerBySpringCGLIB$$1dda2c29.execute(<generated>) at org.littlewings.spring.jdbc.App.lambda$run$0(App.java:40) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)
このあたりです。
トランザクションを開始している場所は、このあたりを見るとよいのではないでしょうか。
補足
実は、今回のmainクラスのように別スレッド内でBeanを取得するのではなく、元のスレッドで取得したBeanを
別スレッドに渡して実行することでも、今回の確認した内容(=接続もトランザクションも別々になる)とは
同じ結果になります。
final QueryExecutor runner = ctx.getBean(QueryExecutor.class); // 呼び出し元のスレッド側でBeanを取得(DIされたものでもよい) List<Future<?>> futures = IntStream .rangeClosed(1, 10) .mapToObj(i -> taskExecutor.submit(() -> { // QueryExecutor runner = ctx.getBean(QueryExecutor.class); // ここで取得しなくてもよい runner.execute(i); })) .collect(Collectors.toList());
ですが、見た目的にちょっとわかりにくい気がするんですよね…。
結局のところ、データベース接続とスレッドが関連づけられているので、Bean自体をどこで取得しようが
データベースまわりの処理がどのスレッドで実行されるかだけに依存しているだけなので。
個人的には、スレッドが異なることが明示的になっている方がわかりやすいのかなぁとは思います。
まとめ
Spring FrameworkのJDBC接続がスレッドに紐付けられていることを確認し、オマケでトランザクションまわりのコードを
少し見てみました。
予想通りの結果だったりはするのですが、自分で確認しておきたかったのでよいかな、と。