CLOVER🍀

That was when it all began.

Spring Boot(JobLauncherApplicationRunner) × Spring BatchとJobParametersIncrementerの動作を確認してみる(+JsrJobParametersConverter)

これは、なにをしたくて書いたもの?

Spring BootとSpring Batchを合わせて使っている時の、JobParametersIncrementerの立ち位置がちょっと気になりまして。

JobParametersIncrementer

JobParametersIncrementerについては、こちらに書かれています。

Configuring and Running a Job / Advanced Meta-Data Usage / JobParametersIncrementer

JobOperatorと関連して説明があり、JobOperator#startNextInstanceメソッドは常に新しいJobインスタンスを開始することが
書かれています。

the startNextInstance method is worth noting. This method will always start a new instance of a Job.

新しいJobInstanceをトリガーする際に新しい(以前と異なる値の)JobParametersが必要となるJobLauncherとは異なり、
startNextInstanceメソッドではJobに関連付けられたJobParametersIncrementerを使って新しいJobインスタンスを作成することに
なるようです。

Unlike JobLauncher though, which requires a new JobParameters object that will trigger a new JobInstance if the parameters are different from any previous set of parameters, the startNextInstance method will use the JobParametersIncrementer tied to the Job to force the Job to a new instance:

JobParametersIncrementerにはJobParametersを返すgetNext(JobParameters)というメソッドがあります。

JobParametersIncrementer (Spring Batch 4.3.5 API)

JobParametersIncrementerJobParametersが与えられると、必要とされる値をインクリメントして、次のJobParametersを返します。

The contract of JobParametersIncrementer is that, given a JobParameters object, it will return the 'next' JobParameters object by incrementing any necessary values it may contain.

なにがインクリメントされるのかは、JobParametersIncrementerによって抽象化されています。

用意されている実装としては、RunIdIncrementerDataFieldMaxValueJobParametersIncrementerがあり、自分で実装することもできます。

RunIdIncrementer (Spring Batch 4.3.5 API)

DataFieldMaxValueJobParametersIncrementer (Spring Batch 4.3.5 API)

RunIdIncrementerrun.idというパラメーターを使い、初回は1で開始、それ以降はrun.idをインクリメントしていきます。

DataFieldMaxValueJobParametersIncrementerは、Spring JDBCDataFieldMaxValueIncrementerを使って…要するに、データベースの
シーケンスに相当する機能を使ったインクリメントを行います。

DataFieldMaxValueIncrementer (Spring Framework 5.3.19 API)

よく使うのは、RunIdIncrementerではないかな、と思います。

で、今回はJobParametersIncrementerJobParametersの関係でちょっと気になるところがあるので、動かして確認してみます。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-109-generic", arch: "amd64", family: "unix"

JobRepositoryで使用するデータベースは、MySQLとします。

$ mysql --version
mysql  Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)

MySQLは、172.17.0.2で動作しているものとします。

プロジェクトを作成する

まずは、Spring Bootプロジェクトを作成します。依存関係は、batchmysqlとしました。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=batch-jobparameters-incrementer \
  -d groupId=org.littlewings \
  -d artifactId=batch-jobparameters-incrementer \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.batch \
  -d dependencies=batch,mysql \
  -d baseDir=batch-jobparameters-incrementer | tar zxvf -

作成されたプロジェクト内に移動。

$ cd batch-jobparameters-incrementer

Mavenの依存関係およびプラグインの設定は、こちら。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-batch</artifactId>
                </dependency>

                <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.batch</groupId>
                        <artifactId>spring-batch-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

生成されたソースコードは、削除しておきます。

$ rm src/main/java/org/littlewings/spring/batch/BatchJobparametersIncrementerApplication.java src/test/java/org/littlewings/spring/batch/BatchJobparametersIncrementerApplicationTests.java

では、ソースコードを作成していきます。

ソースコードを作成する

今回のお題に沿って、JobParametersおよびJobParametersIncrementerを使うアプリケーションを作成していきましょう。

Jobの設定は、こちら。

src/main/java/org/littlewings/spring/batch/JobConfig.java

package org.littlewings.spring.batch;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.core.jsr.JsrJobParametersConverter;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobConfig {
    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job withJobParametersIncrementerJob() {
        return jobBuilderFactory
                .get("withJobParametersIncrementerJob")
                //.incrementer(new RunIdIncrementer())
                .start(withJobParametersIncrementerStep())
                .build();
    }

    @Bean
    Step withJobParametersIncrementerStep() {
        return stepBuilderFactory
                .get("withJobParametersIncrementerStep")
                .tasklet(jobParametersLoggingTasklet())
                .build();
    }

    @Bean
    @StepScope
    public Tasklet jobParametersLoggingTasklet() {
        return (contribution, chunkContext) -> {
            System.out.println("----- start -----");

            chunkContext
                    .getStepContext()
                    .getJobParameters()
                    .entrySet()
                    .forEach(entry -> System.out.printf("parameter: %s = %s%n", entry.getKey(), entry.getValue()));

            System.out.println("----- end -----");

            return RepeatStatus.FINISHED;
        };
    }
}

最初は、JobParametersIncrementer(今回はRunIdIncrementerを使います)は外しておきます。

    @Bean
    public Job withJobParametersIncrementerJob() {
        return jobBuilderFactory
                .get("withJobParametersIncrementerJob")
                //.incrementer(new RunIdIncrementer())
                .start(withJobParametersIncrementerStep())
                .build();
    }

Step内の処理は、Taskletを使って定義することにしましょう。JobParametersに含まれているパラメーターを、すべて標準出力に
書き出すだけのTaskletです。

    @Bean
    @StepScope
    public Tasklet jobParametersLoggingTasklet() {
        return (contribution, chunkContext) -> {
            System.out.println("----- start -----");

            chunkContext
                    .getStepContext()
                    .getJobParameters()
                    .entrySet()
                    .forEach(entry -> System.out.printf("parameter: %s = %s%n", entry.getKey(), entry.getValue()));

            System.out.println("----- end -----");

            return RepeatStatus.FINISHED;
        };
    }

mainクラス。

src/main/java/org/littlewings/spring/batch/App.java

package org.littlewings.spring.batch;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

アプリケーションの設定。

src/main/resources/application.properties

spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=UTF-8
spring.datasource.username=kazuhira
spring.datasource.password=password

spring.batch.jdbc.initialize-schema=always

いったん、ベースの部分はこれで完成です。

JobParametersIncrementerを使わずに動かしてみる

まずは、JobParametersIncrementerを使わずに動かしてみましょう。

パッケージング。

$ mvn package

最初は引数なしで実行。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar

ログ。

2022-05-10 01:21:04.498  INFO 20166 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:21:04.631  INFO 20166 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{}]
2022-05-10 01:21:04.716  INFO 20166 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
----- end -----
2022-05-10 01:21:04.789  INFO 20166 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 72ms
2022-05-10 01:21:04.849  INFO 20166 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 180ms

当然ですが、JobParametersから取得できるパラメーターはありませんし、起動時に指定していないこともログ出力されています。

2022-05-10 01:21:04.631  INFO 20166 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{}]



----- start -----
----- end -----

再度実行。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar

この条件だと、JobParametersなしのJobはすでに完了しているので、Jobは実行されずに終了します。

2022-05-10 01:21:34.676  INFO 20230 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:21:34.825  INFO 20230 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{}]
2022-05-10 01:21:34.874  INFO 20230 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=withJobParametersIncrementerStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=
2022-05-10 01:21:34.898  INFO 20230 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 34ms

次に、パラメーターを指定して実行。2つ指定しました。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param1=value1-1 param2=value2-1

ログ。

2022-05-10 01:21:59.828  INFO 20304 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-1, param2=value2-1]
2022-05-10 01:21:59.962  INFO 20304 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{param1=value1-1, param2=value2-1}]
2022-05-10 01:22:00.041  INFO 20304 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: param1 = value1-1
parameter: param2 = value2-1
----- end -----
2022-05-10 01:22:00.127  INFO 20304 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 85ms
2022-05-10 01:22:00.168  INFO 20304 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{param1=value1-1, param2=value2-1}] and the following status: [COMPLETED] in 169ms

指定したJobParameter2つを認識しています。

2022-05-10 01:21:59.828  INFO 20304 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-1, param2=value2-1]
2022-05-10 01:21:59.962  INFO 20304 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{param1=value1-1, param2=value2-1}]
2022-05-10 01:22:00.041  INFO 20304 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: param1 = value1-1
parameter: param2 = value2-1
----- end -----

JobParameterをひとつだけにして実行。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param2=value2-2

認識するJobParameterはひとつになります。

2022-05-10 01:23:01.238  INFO 20390 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param2=value2-2]
2022-05-10 01:23:01.436  INFO 20390 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{param2=value2-2}]
2022-05-10 01:23:01.534  INFO 20390 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: param2 = value2-2
----- end -----
2022-05-10 01:23:01.623  INFO 20390 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 88ms
2022-05-10 01:23:01.693  INFO 20390 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{param2=value2-2}] and the following status: [COMPLETED] in 199ms

2つ指定した時と、同じJobParameterをもう1度指定してみましょう。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param1=value1-1 param2=value2-1

この場合は、実行されずに終了、ではなく例外となるようです。

2022-05-10 01:23:54.373  INFO 20463 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-1, param2=value2-1]
2022-05-10 01:23:54.476  INFO 20463 --- [           main] ConditionEvaluationReportLoggingListener :

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2022-05-10 01:23:54.493 ERROR 20463 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:771) ~[spring-boot-2.6.7.jar!/:2.6.7]
        at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:758) ~[spring-boot-2.6.7.jar!/:2.6.7]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:310) ~[spring-boot-2.6.7.jar!/:2.6.7]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.7.jar!/:2.6.7]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.7.jar!/:2.6.7]
        at org.littlewings.spring.batch.App.main(App.java:11) ~[classes!/:0.0.1-SNAPSHOT]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
        at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:108) ~[batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
        at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) ~[batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={param1=value1-1, param2=value2-1}.  If you want to run this job again, change the parameters.
        at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:139) ~[spring-batch-core-4.3.5.jar!/:4.3.5]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.19.jar!/:5.3.19]
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.19.jar!/:5.3.19]
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.19.jar!/:5.3.19]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:181) ~[spring-batch-core-4.3.5.jar!/:4.3.5]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at jdk.proxy2/jdk.proxy2.$Proxy46.createJobExecution(Unknown Source) ~[na:na]
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) ~[spring-batch-core-4.3.5.jar!/:4.3.5]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128) ~[spring-batch-core-4.3.5.jar!/:4.3.5]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.19.jar!/:5.3.19]
        at jdk.proxy2/jdk.proxy2.$Proxy53.run(Unknown Source) ~[na:na]
        at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.execute(JobLauncherApplicationRunner.java:199) ~[spring-boot-autoconfigure-2.6.7.jar!/:2.6.7]
        at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:173) ~[spring-boot-autoconfigure-2.6.7.jar!/:2.6.7]
        at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:160) ~[spring-boot-autoconfigure-2.6.7.jar!/:2.6.7]
        at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:155) ~[spring-boot-autoconfigure-2.6.7.jar!/:2.6.7]
        at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:150) ~[spring-boot-autoconfigure-2.6.7.jar!/:2.6.7]
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768) ~[spring-boot-2.6.7.jar!/:2.6.7]
        ... 13 common frames omitted

JobParameterに指定する名前を変えずに、値だけを変えてみます。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param1=value1-2 param2=value2-3

これは、問題なく起動できます。

2022-05-10 01:24:46.195  INFO 20540 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-2, param2=value2-3]
2022-05-10 01:24:46.447  INFO 20540 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{param1=value1-2, param2=value2-3}]
2022-05-10 01:24:46.582  INFO 20540 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: param1 = value1-2
parameter: param2 = value2-3
----- end -----
2022-05-10 01:24:46.679  INFO 20540 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 96ms
2022-05-10 01:24:46.731  INFO 20540 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{param1=value1-2, param2=value2-3}] and the following status: [COMPLETED] in 226ms

このパターンの確認は、ここまでにしておきましょう。

データベースを1度再作成しておきます。

mysql> drop database practice;
Query OK, 9 rows affected (0.84 sec)

mysql> create database practice;
Query OK, 1 row affected (0.08 sec)

JobParametersIncrementerを適用してみる

次は、JobParametersIncrementerを適用してみます。実装としてはRunIdIncrementerを使うので、コメントアウトを解除して

    @Bean
    public Job withJobParametersIncrementerJob() {
        return jobBuilderFactory
                .get("withJobParametersIncrementerJob")
                .incrementer(new RunIdIncrementer())
                .start(withJobParametersIncrementerStep())
                .build();
    }

パッケージングします。

$ mvn package

まずはJobParametersなしで実行。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar

ログ。

2022-05-10 01:31:35.871  INFO 21255 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:31:36.029  INFO 21255 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{run.id=1}]
2022-05-10 01:31:36.143  INFO 21255 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: run.id = 1
----- end -----
2022-05-10 01:31:36.240  INFO 21255 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 96ms
2022-05-10 01:31:36.291  INFO 21255 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 220ms

起動時に指定していないものの、run.idというJobParameterが現れます。

2022-05-10 01:31:35.871  INFO 21255 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:31:36.029  INFO 21255 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{run.id=1}]
2022-05-10 01:31:36.143  INFO 21255 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: run.id = 1
----- end -----

RunIdIncrementerは、run.idというJobParameterをインクリメントするクラスでした。パラメーターが存在しない場合は、1に初期化します。

This incrementer increments a "run.id" parameter of type Long from the given job parameters. If the parameter does not exist, it will be initialized to 1.

RunIdIncrementer (Spring Batch 4.3.5 API)

もう1度引数なしで実行してみましょう。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar

JobParametersIncrementerを適用していない時と異なり、今回は起動できます。

2022-05-10 01:32:35.690  INFO 21336 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:32:35.914  INFO 21336 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{run.id=2}]
2022-05-10 01:32:36.022  INFO 21336 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: run.id = 2
----- end -----
2022-05-10 01:32:36.117  INFO 21336 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 95ms
2022-05-10 01:32:36.171  INFO 21336 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{run.id=2}] and the following status: [COMPLETED] in 209ms

この時、run.idの値が2にインクリメントされています。

この値は、BATCH_JOB_EXECUTION_PARAMSテーブルに保存されています。

mysql> select * from BATCH_JOB_EXECUTION_PARAMS;
+------------------+---------+----------+------------+----------------------------+----------+------------+-------------+
| JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | STRING_VAL | DATE_VAL                   | LONG_VAL | DOUBLE_VAL | IDENTIFYING |
+------------------+---------+----------+------------+----------------------------+----------+------------+-------------+
|                1 | LONG    | run.id   |            | 1970-01-01 09:00:00.000000 |        1 |          0 | Y           |
|                2 | LONG    | run.id   |            | 1970-01-01 09:00:00.000000 |        2 |          0 | Y           |
+------------------+---------+----------+------------+----------------------------+----------+------------+-------------+
2 rows in set (0.00 sec)

次に、JobParameterを2つ与えて実行。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param1=value1-1 param2=value2-1

run.idはインクリメントされつつ、JobParameterが2つ追加されました。

2022-05-10 01:37:26.677  INFO 21652 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-1, param2=value2-1]
2022-05-10 01:37:26.884  INFO 21652 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{run.id=3, param1=value1-1, param2=value2-1}]
2022-05-10 01:37:27.017  INFO 21652 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: run.id = 3
parameter: param1 = value1-1
parameter: param2 = value2-1
----- end -----
2022-05-10 01:37:27.108  INFO 21652 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 90ms
2022-05-10 01:37:27.164  INFO 21652 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{run.id=3, param1=value1-1, param2=value2-1}] and the following status: [COMPLETED] in 226ms

今度は、JobParameterをひとつ指定して、項目名は同じで値は変更して実行してみます。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param2=value2-2

ログ。

2022-05-10 01:38:12.017  INFO 21743 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param2=value2-2]
2022-05-10 01:38:12.221  INFO 21743 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{run.id=4, param1=value1-1, param2=value2-2}]
2022-05-10 01:38:12.343  INFO 21743 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: run.id = 4
parameter: param1 = value1-1
parameter: param2 = value2-2
----- end -----
2022-05-10 01:38:12.448  INFO 21743 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 104ms
2022-05-10 01:38:12.551  INFO 21743 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{run.id=4, param1=value1-1, param2=value2-2}] and the following status: [COMPLETED] in 245ms

よく見ると、起動時に指定したJobParameterはひとつなのに

2022-05-10 01:38:12.017  INFO 21743 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param2=value2-2]

run.id以外にも、さらにもうひとつJobParameterを認識しています。

2022-05-10 01:38:12.221  INFO 21743 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{run.id=4, param1=value1-1, param2=value2-2}]
2022-05-10 01:38:12.343  INFO 21743 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: run.id = 4
parameter: param1 = value1-1
parameter: param2 = value2-2
----- end -----

この時のBATCH_JOB_EXECUTION_PARAMSテーブルの内容。

mysql> select * from BATCH_JOB_EXECUTION_PARAMS;
+------------------+---------+----------+------------+----------------------------+----------+------------+-------------+
| JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | STRING_VAL | DATE_VAL                   | LONG_VAL | DOUBLE_VAL | IDENTIFYING |
+------------------+---------+----------+------------+----------------------------+----------+------------+-------------+
|                1 | LONG    | run.id   |            | 1970-01-01 09:00:00.000000 |        1 |          0 | Y           |
|                2 | LONG    | run.id   |            | 1970-01-01 09:00:00.000000 |        2 |          0 | Y           |
|                3 | LONG    | run.id   |            | 1970-01-01 09:00:00.000000 |        3 |          0 | Y           |
|                3 | STRING  | param1   | value1-1   | 1970-01-01 09:00:00.000000 |        0 |          0 | Y           |
|                3 | STRING  | param2   | value2-1   | 1970-01-01 09:00:00.000000 |        0 |          0 | Y           |
|                4 | LONG    | run.id   |            | 1970-01-01 09:00:00.000000 |        4 |          0 | Y           |
|                4 | STRING  | param1   | value1-1   | 1970-01-01 09:00:00.000000 |        0 |          0 | Y           |
|                4 | STRING  | param2   | value2-2   | 1970-01-01 09:00:00.000000 |        0 |          0 | Y           |
+------------------+---------+----------+------------+----------------------------+----------+------------+-------------+
8 rows in set (0.00 sec)

どうやら、指定しなかったJobParameterについても、ひとつ前の値を引き継いでいるように見えます。

最初にJobParameterを2つ指定したパターンと、同じ値を指定してみましょう。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param1=value1-1 param2=value2-1

JobParametersIncrementerを適用していない時はこのパターンは例外がスローされましたが、今回は成功します。

2022-05-10 01:40:24.813  INFO 21889 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-1, param2=value2-1]
2022-05-10 01:40:25.008  INFO 21889 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{run.id=5, param1=value1-1, param2=value2-1}]
2022-05-10 01:40:25.102  INFO 21889 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: run.id = 5
parameter: param1 = value1-1
parameter: param2 = value2-1
----- end -----
2022-05-10 01:40:25.179  INFO 21889 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 77ms
2022-05-10 01:40:25.230  INFO 21889 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{run.id=5, param1=value1-1, param2=value2-1}] and the following status: [COMPLETED] in 171ms

値を変えてみます。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param1=value1-2 param2=value2-3

こちらもうまくいきます。

2022-05-10 01:41:23.113  INFO 22000 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-2, param2=value2-3]
2022-05-10 01:41:23.327  INFO 22000 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{run.id=6, param1=value1-2, param2=value2-3}]
2022-05-10 01:41:23.445  INFO 22000 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: run.id = 6
parameter: param1 = value1-2
parameter: param2 = value2-3
----- end -----
2022-05-10 01:41:23.570  INFO 22000 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 124ms
2022-05-10 01:41:23.644  INFO 22000 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{run.id=6, param1=value1-2, param2=value2-3}] and the following status: [COMPLETED] in 251ms

最後に、JobParameter指定なしに戻してみましょう。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar

すると、やっぱりひとつ前に指定したJobParameterの内容を復元してきます。

2022-05-10 01:42:16.446  INFO 22086 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:42:16.623  INFO 22086 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{run.id=7, param1=value1-2, param2=value2-3}]
2022-05-10 01:42:16.734  INFO 22086 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: run.id = 7
parameter: param1 = value1-2
parameter: param2 = value2-3
----- end -----
2022-05-10 01:42:16.813  INFO 22086 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 79ms
2022-05-10 01:42:16.874  INFO 22086 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{run.id=7, param1=value1-2, param2=value2-3}] and the following status: [COMPLETED] in 210ms

動作を見ていると、1度でもJobParameterを指定すると、次回の実行時にはその値を復元し、明示的に与えた場合は上書きして実行している
ように見えますね。

ここで、Spring BootのSpring Batchに関するドキュメントを見てみます。

バッチの実行は、JobLauncherApplicationRunnerで行われると書かれています。

By default, it executes all Jobs in the application context on startup (see JobLauncherApplicationRunner for details).

“How-to” Guides / Batch Applications / Running Spring Batch Jobs on Startup

ここでJobLauncherApplicationRunnerソースコードを見ると、JobParametersIncrementerが指定されている場合は、JobParameters
起動時に明示的に与えたものと過去の値をマージして実行しているようです。

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java#L209-L214

他のSpring BatchのJobの起動方法は今回は見ませんが、JobParametersIncrementer自体が「次回のJobParameters」を作成するもの
という扱いだったので、そもそもそういうものかもしれません…。

The contract of JobParametersIncrementer is that, given a JobParameters object, it will return the 'next' JobParameters object by incrementing any necessary values it may contain.

Configuring and Running a Job / Advanced Meta-Data Usage / JobParametersIncrementer

このようにJobParametersIncrementerを使用すると、JobParameterをインクリメントしてJobを実行できるようになるものの、
JobParameters自体の扱いは少し変わってしまうことがわかりました。

JsrJobParametersConverterを試してみる

ところで、RunIdIncrementerを使うとrun.idをインクリメントしてJobを実行できるのですが、JobParametersIncrementerではないものの
似たような機能を持つJsrJobParametersConverterというものがあったので、こちらも試してみます。

RunIdIncrementerを使うのをやめて、

    @Bean
    public Job withJobParametersIncrementerJob() {
        return jobBuilderFactory
                .get("withJobParametersIncrementerJob")
                //.incrementer(new RunIdIncrementer())
                .start(withJobParametersIncrementerStep())
                .build();
    }

JsrJobParametersConverterをBeanとして定義します。

    @Bean
    public JobParametersConverter jobParametersConverter(DataSource dataSource) {
        return new JsrJobParametersConverter(dataSource);
    }

データベースを再作成。

mysql> drop database practice;
Query OK, 9 rows affected (0.43 sec)

mysql> create database practice;
Query OK, 1 row affected (0.03 sec)

パッケージング。

$ mvn package

ここからは、JobParametersIncrementerRunIdIncrementer)を指定した時と同じパターンを実行していってみます。

まずは、JobParameter指定なしの場合。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar

ログ。

2022-05-10 01:53:09.280  INFO 22945 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:53:09.457  INFO 22945 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{jsr_batch_run_id=1}]
2022-05-10 01:53:09.592  INFO 22945 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: jsr_batch_run_id = 1
----- end -----
2022-05-10 01:53:09.680  INFO 22945 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 88ms
2022-05-10 01:53:09.765  INFO 22945 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{jsr_batch_run_id=1}] and the following status: [COMPLETED] in 237ms

JobParametersIncrementerRunIdIncrementer)を指定した時はrun.idだったのがjsr_batch_run_idになりましたが、似たような動きをしているように見えます。

2022-05-10 01:53:09.280  INFO 22945 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:53:09.457  INFO 22945 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{jsr_batch_run_id=1}]
2022-05-10 01:53:09.592  INFO 22945 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: jsr_batch_run_id = 1
----- end -----

もう1度実行してみましょう。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar

jsr_batch_run_idがインクリメントされました。

2022-05-10 01:53:37.420  INFO 23002 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:53:37.638  INFO 23002 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{jsr_batch_run_id=3}]
2022-05-10 01:53:37.741  INFO 23002 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: jsr_batch_run_id = 3
----- end -----
2022-05-10 01:53:37.831  INFO 23002 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 89ms
2022-05-10 01:53:37.905  INFO 23002 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{jsr_batch_run_id=3}] and the following status: [COMPLETED] in 202ms

なのですが、なぜか飛び番になっていますね…。これはまた後で…。

JobParameterを指定。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param1=value1-1 param2=value2-1

JobParameterが追加されつつ、jsr_batch_run_idがインクリメントされています。飛び番になっていますが。

2022-05-10 01:54:05.861  INFO 23096 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-1, param2=value2-1]
2022-05-10 01:54:06.070  INFO 23096 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{param1=value1-1, param2=value2-1, jsr_batch_run_id=5}]
2022-05-10 01:54:06.165  INFO 23096 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: jsr_batch_run_id = 5
parameter: param1 = value1-1
parameter: param2 = value2-1
----- end -----
2022-05-10 01:54:06.266  INFO 23096 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 101ms
2022-05-10 01:54:06.329  INFO 23096 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{param1=value1-1, param2=value2-1, jsr_batch_run_id=5}] and the following status: [COMPLETED] in 207ms

JobParameterを減らしつつ、値を変更。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param2=value2-2

すると、JobParametersIncrementerRunIdIncrementer)を使っていた時には復元されていた前回実行時に指定したJobParameterが、
今回は復元されません。

2022-05-10 01:54:23.120  INFO 23147 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param2=value2-2]
2022-05-10 01:54:23.262  INFO 23147 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{param2=value2-2, jsr_batch_run_id=7}]
2022-05-10 01:54:23.354  INFO 23147 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: jsr_batch_run_id = 7
parameter: param2 = value2-2
----- end -----
2022-05-10 01:54:23.439  INFO 23147 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 85ms
2022-05-10 01:54:23.487  INFO 23147 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{param2=value2-2, jsr_batch_run_id=7}] and the following status: [COMPLETED] in 185ms

コマンドラインで指定したものだけになっていますね。

----- start -----
parameter: jsr_batch_run_id = 7
parameter: param2 = value2-2
----- end -----

最初にJobParameterを指定した時と、同じ値にして実行。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param1=value1-1 param2=value2-1

こちらも動作します。

2022-05-10 01:54:41.074  INFO 23207 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-1, param2=value2-1]
2022-05-10 01:54:41.250  INFO 23207 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{param1=value1-1, param2=value2-1, jsr_batch_run_id=9}]
2022-05-10 01:54:41.371  INFO 23207 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: jsr_batch_run_id = 9
parameter: param1 = value1-1
parameter: param2 = value2-1
----- end -----
2022-05-10 01:54:41.454  INFO 23207 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 82ms
2022-05-10 01:54:41.498  INFO 23207 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{param1=value1-1, param2=value2-1, jsr_batch_run_id=9}] and the following status: [COMPLETED] in 202ms

指定する値を変えてみます。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar param1=value1-2 param2=value2-3

こちらもOKですね。

2022-05-10 01:55:01.612  INFO 23283 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [param1=value1-2, param2=value2-3]
2022-05-10 01:55:01.837  INFO 23283 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{param1=value1-2, param2=value2-3, jsr_batch_run_id=11}]
2022-05-10 01:55:01.935  INFO 23283 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: jsr_batch_run_id = 11
parameter: param1 = value1-2
parameter: param2 = value2-3
----- end -----
2022-05-10 01:55:02.050  INFO 23283 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 115ms
2022-05-10 01:55:02.112  INFO 23283 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{param1=value1-2, param2=value2-3, jsr_batch_run_id=11}] and the following status: [COMPLETED] in 229ms

最後に、JobParameterの指定をなくしてみます。

$ java -Dspring.batch.job.names=withJobParametersIncrementerJob -jar target/batch-jobparameters-incrementer-0.0.1-SNAPSHOT.jar

jsr_batch_run_idだけが存在することになりました。

2022-05-10 01:55:22.130  INFO 23339 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-10 01:55:22.283  INFO 23339 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] launched with the following parameters: [{jsr_batch_run_id=13}]
2022-05-10 01:55:22.372  INFO 23339 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [withJobParametersIncrementerStep]
----- start -----
parameter: jsr_batch_run_id = 13
----- end -----
2022-05-10 01:55:22.523  INFO 23339 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [withJobParametersIncrementerStep] executed in 151ms
2022-05-10 01:55:22.614  INFO 23339 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=withJobParametersIncrementerJob]] completed with the following parameters: [{jsr_batch_run_id=13}] and the following status: [COMPLETED] in 253ms

割とわかりやすい動作になっているのではないでしょうか。

さて、インクリメントの仕組みはどのようになっているのでしょうか。ソースコードを少し見てみます。

実際にインクリメントしているのは、こちら。Jobの起動時にJobParameterとしてjsr_batch_run_idが与えられていない場合は、
インクリメントするようです。

https://github.com/spring-projects/spring-batch/blob/4.3.5/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/JsrJobParametersConverter.java#L104-L106

内部ではDataFieldMaxValueIncrementerを使用しますが、今回はMySQLを使用しているので実際に使われるのはこちらですね。

https://github.com/spring-projects/spring-framework/blob/v5.3.19/spring-jdbc/src/main/java/org/springframework/jdbc/support/incrementer/MySQLMaxValueIncrementer.java#L148-L149

以下のテーブルを使って実現しています。

mysql> select * from BATCH_JOB_SEQ;
+----+------------+
| ID | UNIQUE_KEY |
+----+------------+
| 14 | 0          |
+----+------------+
1 row in set (0.00 sec)

ところで、なぜ飛び番になったのか、という点についてですが。

Jobの起動時にjsr_batch_run_idをインクリメントするために更新する以外にも、ここでカウントアップするからですね。

https://github.com/spring-projects/spring-batch/blob/4.3.5/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java#L158

なので、JsrJobParametersConverterを使った場合は結果的に1回の実行でBATCH_JOB_SEQテーブルの値が2回インクリメントされることに
なります。

これが、飛び番になった理由です。

まあそれくらいならいいかなと思いきや、JSR-356(jBatch)に関するSpring Batchの実装は、次のバージョンで削除されるようです。

Remove JSR-352 implementation · Issue #3894 · spring-projects/spring-batch · GitHub

Remove JSR-352 implementation · spring-projects/spring-batch@e5c752b · GitHub

この削除対象にJsrJobParametersConverterも含まれているので、使っている場合はSpring Batch 5.0になったら使えなくなることに
なります。

こちらのissueや参照されているMLを見ると、jBatchがCDIを要求するようになるので、Spring BatchではJSR-352への対応をやめるみたいですね。

まあ、現時点で使うならこの状況を承知のうえで、と…。

まとめ

今回は、Spring Boot(に含まれるJobLauncherApplicationRunner)を使ってSpring Batchを実行する際に、JobParametersIncrementer
使用するとJobParameterに関する動作がどう変わるのか、ということを確認してみました。

こういう挙動になるとは知らなかったので、ちょっと驚きましたが押さえておくとしましょう。

最後に、RunIdIncrementerを解除してJsrJobParametersConverterを使用した、Jobの定義全体を載せておきます。

src/main/java/org/littlewings/spring/batch/JobConfig.java

package org.littlewings.spring.batch;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.core.jsr.JsrJobParametersConverter;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobConfig {
    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job withJobParametersIncrementerJob() {
        return jobBuilderFactory
                .get("withJobParametersIncrementerJob")
                //.incrementer(new RunIdIncrementer())
                .start(withJobParametersIncrementerStep())
                .build();
    }

    @Bean
    Step withJobParametersIncrementerStep() {
        return stepBuilderFactory
                .get("withJobParametersIncrementerStep")
                .tasklet(jobParametersLoggingTasklet())
                .build();
    }

    @Bean
    @StepScope
    public Tasklet jobParametersLoggingTasklet() {
        return (contribution, chunkContext) -> {
            System.out.println("----- start -----");

            chunkContext
                    .getStepContext()
                    .getJobParameters()
                    .entrySet()
                    .forEach(entry -> System.out.printf("parameter: %s = %s%n", entry.getKey(), entry.getValue()));

            System.out.println("----- end -----");

            return RepeatStatus.FINISHED;
        };
    }

    @Bean
    public JobParametersConverter jobParametersConverter(DataSource dataSource) {
        return new JsrJobParametersConverter(dataSource);
    }
}

Spring IntegrationのJDBCサポートを使って、データベースポーリングを試してみる

これは、なにをしたくて書いたもの?

Spring Integrationで、JDBCを使ったデータベースポーリングをサポートしているというので、ちょっと試してみました。

Spring IntegrationのポーリングとJDBCサポート

Spring Integrationでは、ポーリングの機能があります。

Messaging Channels / Poller

ポーリングを使うようなユースケースには以下のようなものがあり、この中のひとつにデータベースが含まれています。

  • Polling certain external systems, such as FTP Servers, Databases, and Web Services
  • Polling internal (pollable) message channels
  • Polling internal services (such as repeatedly executing methods on a Java class)

Messaging Channels / Pollable Message Source

また、Spring IntegrationにはJDBCのサポートが含まれています。

JDBC Support

Inbound Channel Adapter、Outbound Channel Adapter、Outbound Gatewayなどがあります。

今回は、Inbound Channel AdapterとOutbound Channel Adapterを使ってみたいと思います。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-109-generic", arch: "amd64", family: "unix"

ポーリングするデータベースには、MySQLを使います。

$ mysql --version
mysql  Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)

MySQLは、172.17.0.2で動作しているものとします。

お題

今回のお題は、以下とします。

  • JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
    • テーブルにステータスを持ち、0のレコードを処理対象とし、処理が終わったら1とする

テーブルの定義は、以下とします。

create table polling_message(
  id integer auto_increment,
  message_text text,
  process_status integer,
  primary key(id)
);
  • ポーリングで取得したデータに対しては、以下の処理を行う
    • 標準出力に書き出す
    • JDBCサポートのOutbound Channel Adapterを使って、別のテーブルに書き出す

別のテーブルというのは、以下の定義とします。

create table processed_message_log(
  log_id integer auto_increment,
  message_text text,
  primary key(log_id)
);

出力処理は、交互にやっていきます。

プロジェクトを作成する

まずは、Spring InitializrでSpring Bootプロジェクトを作成します。

依存関係には、integrationjdbcmysqlを加えます。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=integration-jdbc-polling \
  -d groupId=org.littlewings \
  -d artifactId=integration-jdbc-polling \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.integration \
  -d dependencies=integration,jdbc,mysql \
  -d baseDir=integration-jdbc-polling | tar zxvf -

プロジェクト内に移動。

$ cd integration-jdbc-polling

生成されたソースコードは、とりあえず削除。

$ rm src/main/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplication.java src/test/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplicationTests.java

Mavenの依存関係や、プラグインの設定はこちら。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-integration</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-jdbc</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-jdbc</artifactId>
                </dependency>

                <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

今回のポイントは、spring-integration-jdbcですね。

なお、初めてSpring Integrationを試した時はRedisのInbound Channel Adapterを使ったのですが、この時はwebがないと(サーバーがないと)
終了してしまって困って追加したのですが、今回はそんなことはなさそうでした。

Spring Integrationを試してみる - CLOVER🍀

まず、mainクラスだけ作成。

src/main/java/org/littlewings/spring/integration/App.java

package org.littlewings.spring.integration;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

データベースをポーリングして、取得したデータを標準出力に書き出す

最初は、こちらから。

  • JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
    • テーブルにステータスを持ち、0のレコードを処理対象とし、処理が終わったら1とする

schema.sqlで、テーブル定義。

src/main/resources/schema.sql

drop table if exists polling_message;

create table polling_message(
  id integer auto_increment,
  message_text text,
  process_status integer,
  primary key(id)
);

テーブルに対応するクラスも作成。

src/main/java/org/littlewings/spring/integration/PollingMessage.java

package org.littlewings.spring.integration;

public class PollingMessage {

    Integer id;
    String messageText;
    Integer processStatus;

    // getter/setterは省略
}

このクラスは今回必須ではないのですが、ポーリングして取得した結果を、このクラスにマッピングすることにします。

そして、組み立てたフロー。

src/main/java/org/littlewings/spring/integration/JdbcPollingConfig.java

package org.littlewings.spring.integration;

import java.time.Duration;
import java.util.List;
import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.messaging.MessageHandler;

@Configuration
public class JdbcPollingConfig {
    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

    @Bean
    public IntegrationFlow stdout() {
        Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class);

        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );
                })
                .get();
    }
}

Inbound Channel Adapterの部分は、ドキュメントはこのあたりを参考にしつつ。

JDBC Support / Inbound Channel Adapter

このように書いています。

    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

順に説明していくと。

JdbcPollingChannelAdapterを使い、アクセスするDataSourceとポーリングの際に使用するSQLを指定。

        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

ポーリング後の更新SQL。こちらで、ステータスを変更します。

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");

:付きのパラメーターは、ポーリングで取得した各行の値を指定しているもので、Spring JDBCの機能を利用しています。

The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration.

JDBC Support / Inbound Channel Adapter

ちなみに、複数渡ってくるようなので、where句の条件がinになっています。

そして、取得結果を先ほど作成したクラスにマッピング

        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

ポーリング自体の設定はこちら。

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

先ほどのJdbcPollingChannelAdapterと、ポーリングの間隔を10秒にしつつ、トランザクションを有効にしています。

                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )

トランザクションを有効にすると、selectupdateが同じトランザクションで実行されます。

the update and select queries are both executed in the same transaction.

また、ダウンストリームを(デフォルトの)DirectChannelとすると、エンドポイントが同じスレッドで動作するので、トランザクション
同じになります。

A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.

この場合、ダウンストリームで失敗した場合は、全体がロールバックされることになりますね。

JDBC Support / Inbound Channel Adapter / Polling and Transactions

また、Inbound Channel Adapterの書き方の例は、Java DSLの方も参考になります。

Java DSL / Inbound Channel Adapters

次に、標準出力に書き出す方。

    @Bean
    public IntegrationFlow stdout() {
        Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class);

        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );
                })
                .get();
    }

JdbcPollingChannelAdapterBeanPropertyRowMapperを使って、クエリーの結果をPollingMessageとして扱うことができます。
Listではありますが。

                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();

あとはメッセージをログ出力です。

                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );

コメントアウトしている部分は、スタックトレースを見たり、例外を投げてトランザクションロールバックされることを確認するのに
使ったりしていました。

設定。

src/main/resources/application.properties

spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8
spring.datasource.username=kazuhira
spring.datasource.password=password

spring.sql.init.mode=always

logging.level.org.springframework.jdbc.core=debug
logging.level.org.springframework.jdbc.support=debug

実行しているSQLや、トランザクションの様子をログ出力するようにしました。

起動時にデータも登録するようにしました。ポーリングの取得対象外にするデータも含めています。

src/main/resources/data.sql

insert into polling_message(message_text, process_status) values('Hello World!!', 0);
insert into polling_message(message_text, process_status) values('[ignore] こんにちは、世界', 9);
insert into polling_message(message_text, process_status) values('Hello Spring Integration', 0);
insert into polling_message(message_text, process_status) values('Hello Spring Boot', 0);
insert into polling_message(message_text, process_status) values('[ignore] Hello MySQL!!', 9);

では、動かしてみましょう。

パッケージングして

$ mvn package

実行。

$ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar

起動した時点での、テーブルの状態。

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              0 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              0 |
|  4 | Hello Spring Boot                 |              0 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

ログを見てみます。

2022-05-04 03:05:23.377 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:05:23.378 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:23.380 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:05:23.382  INFO 69669 --- [           main] org.littlewings.spring.integration.App   : Started App in 3.857 seconds (JVM running for 4.49)
2022-05-04 03:05:23.387 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:23.388 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:05:23.449 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:05:23.459 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:05:23.459 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:05:23.470 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:05:23.471 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:05:23.497  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 1, message_text = Hello World!!, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 3, message_text = Hello Spring Integration, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 4, message_text = Hello Spring Boot, process_status = 0
2022-05-04 03:05:23.501 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:23.502 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:23.523 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
2022-05-04 03:05:33.363 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:05:33.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:33.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

トランザクションの開始。

2022-05-04 03:05:23.377 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:05:23.378 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction

select文の実行。

2022-05-04 03:05:23.387 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:23.388 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]

update文の実行。

2022-05-04 03:05:23.470 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:05:23.471 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]

取得したレコードの出力。

2022-05-04 03:05:23.497  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 1, message_text = Hello World!!, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 3, message_text = Hello Spring Integration, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 4, message_text = Hello Spring Boot, process_status = 0

トランザクションのコミットまでの様子がわかります。

2022-05-04 03:05:23.501 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:23.502 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]

スレッド名を見ると、一連の処理が同じスレッドで動作しているようです。

その後は、10秒おきにselect文が実行されますが、すでにデータが更新されているので処理対象となるデータはありません。

2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:05:33.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:33.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

テーブルの状態を見ると、このようになっています。

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              1 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              1 |
|  4 | Hello Spring Boot                 |              1 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

ポーリングで使用しているselect文では、process_status0のもののみを対象にしているので、9にしていたレコードはそのまま
残っていますね。

続いて、データを追加してみます。処理対象外のデータも含めています。

mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0);
Query OK, 1 row affected (0.03 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9);
Query OK, 1 row affected (0.03 sec)

mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0);
Query OK, 1 row affected (0.02 sec)

すると、次回のポーリング時にデータが取得され、標準出力に書き出されます。

2022-05-04 03:08:23.363 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:08:23.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:08:23.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:08:23.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:08:23.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:08:23.368 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:08:23.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:08:23.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:08:23.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:08:23.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:08:23.372  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 6, message_text = Spring Integration JDBC Support, process_status = 0
2022-05-04 03:08:23.372  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 8, message_text = Hello MySQL Connector/J, process_status = 0
2022-05-04 03:08:23.372  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 10, message_text = JDBCでデータベースポーリング, process_status = 0
2022-05-04 03:08:23.372 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:08:23.372 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:08:23.462 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

その後は、また処理対象のデータがなくなります。

2022-05-04 03:08:33.364 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:08:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:08:33.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:08:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:08:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:08:33.368 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:08:33.368 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:08:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

データの状態。

mysql> select * from polling_message;
+----+--------------------------------------------+----------------+
| id | message_text                               | process_status |
+----+--------------------------------------------+----------------+
|  1 | Hello World!!                              |              1 |
|  2 | [ignore] こんにちは、世界                  |              9 |
|  3 | Hello Spring Integration                   |              1 |
|  4 | Hello Spring Boot                          |              1 |
|  5 | [ignore] Hello MySQL!!                     |              9 |
|  6 | Spring Integration JDBC Support            |              1 |
|  7 | [ignore] Hello InnoDB                      |              9 |
|  8 | Hello MySQL Connector/J                    |              1 |
|  9 | [ignore] はじめてのSpring Integration      |              9 |
| 10 | JDBCでデータベースポーリング               |              1 |
+----+--------------------------------------------+----------------+
10 rows in set (0.00 sec)

OKですね。

データベースをポーリングして、取得したデータをテーブルに書き出す

次は、テーブルをポーリングして、取得したデータを今度はテーブルに書き出してみましょう。

Outbound Channel Adapterを使います。

JDBC Support / Outbound Channel Adapter

書き出し先のテーブル定義は、このようにしてみます。

create table processed_message_log(
  log_id integer auto_increment,
  message_text text,
  primary key(log_id)
);

先ほどのフロー定義を行ったクラス内のBean定義をコメントアウト

    // @Bean
    public IntegrationFlow stdout() {

JdbcMessageHandlerのBean定義と、その定義を使用したフローを組み立てます。

    @Bean
    public MessageHandler jdbcMessageHandler(DataSource dataSource) {
        JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(
                dataSource,
                """
                        insert into processed_message_log(message_text) 
                        values(:payload.messageText)
                        """
        );
        return jdbcMessageHandler;
    }

    @Bean
    public IntegrationFlow writeTable() {
        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(jdbcMessageHandler(null))
                .get();
    }

insert文の:payload.messageTextという表記についてですが。

                """
                        insert into processed_message_log(message_text) 
                        values(:payload.messageText)
                        """

参照しているのは、Message#getPayloadになります。

Message (Spring Framework 5.3.19 API)

ペイロードの中身は、JdbcPollingChannelAdapterBeanPropertyRowMapperを設定しているので、最初に前に作成したPollingMessageクラスが
含まれているので、PollingMessage#getMessageTextを呼び出していることになります。

    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

なお、この時は1件ずつの処理になるみたいですね。

定義したJdbcMessageHandlerを、フローのHandlerとして登録。

    @Bean
    public IntegrationFlow writeTable() {
        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(jdbcMessageHandler(null))
                .get();
    }

schema.sqlは、以下のように変更。テーブルは再作成されるので、先ほどの処理結果は1度なくなります。

src/main/resources/schema.sql

drop table if exists polling_message;

create table polling_message(
  id integer auto_increment,
  message_text text,
  process_status integer,
  primary key(id)
);

drop table if exists processed_message_log;

create table processed_message_log(
  log_id integer auto_increment,
  message_text text,
  primary key(log_id)
);

では、動作確認をしましょう。

パッケージングして、起動。

$ mvn package
$ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar

ログは、このようになりました。

2022-05-04 03:18:33.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:18:33.301 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:18:33.303 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:18:33.308 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:18:33.308 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:18:33.310  INFO 70288 --- [           main] org.littlewings.spring.integration.App   : Started App in 3.268 seconds (JVM running for 3.712)
2022-05-04 03:18:33.348 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:18:33.350 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:18:33.350 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:18:33.357 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:18:33.358 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:18:33.371 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.372 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.373 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates
2022-05-04 03:18:33.389 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:18:33.390 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:18:33.423 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
2022-05-04 03:18:43.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:18:43.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:18:43.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:18:43.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:18:43.297 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:18:43.299 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:18:43.299 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:18:43.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction

先ほどと違うのは、取得結果が標準出力に書き出されるのではなく、update文になったことですね。

2022-05-04 03:18:33.371 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.372 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.373 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates

バッチ更新をしているようです。

コミットは、insert文の後ですね。

2022-05-04 03:18:33.389 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:18:33.390 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]

データの確認をしましょう。

最初の時点ではこの状態で、

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              0 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              0 |
|  4 | Hello Spring Boot                 |              0 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

処理が行われた後の結果は先ほどと同じですが、

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              1 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              1 |
|  4 | Hello Spring Boot                 |              1 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

処理対象となったデータが、JdbcMessageHandlerによって別のテーブルに登録されていることが確認できます。

mysql> select * from processed_message_log;
+--------+--------------------------+
| log_id | message_text             |
+--------+--------------------------+
|      1 | Hello World!!            |
|      2 | Hello Spring Integration |
|      3 | Hello Spring Boot        |
+--------+--------------------------+
3 rows in set (0.00 sec)

先ほどと同じように、データを追加。

mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0);
Query OK, 1 row affected (0.02 sec)

ログ。

2022-05-04 03:24:03.294 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:24:03.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:24:03.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:24:03.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:24:03.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:24:03.298 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:24:03.298 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:24:03.299 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:24:03.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:24:03.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates
2022-05-04 03:24:03.306 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:24:03.306 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:24:03.662 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
2022-05-04 03:24:13.294 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:24:13.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:24:13.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:24:13.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:24:13.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:24:13.297 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:24:13.297 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:24:13.298 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction

登録したデータが検出され、処理対象になりました。

2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates

データの方も変更されています。

mysql> select * from polling_message;
+----+--------------------------------------------+----------------+
| id | message_text                               | process_status |
+----+--------------------------------------------+----------------+
|  1 | Hello World!!                              |              1 |
|  2 | [ignore] こんにちは、世界                  |              9 |
|  3 | Hello Spring Integration                   |              1 |
|  4 | Hello Spring Boot                          |              1 |
|  5 | [ignore] Hello MySQL!!                     |              9 |
|  6 | Spring Integration JDBC Support            |              1 |
|  7 | [ignore] Hello InnoDB                      |              9 |
|  8 | Hello MySQL Connector/J                    |              1 |
|  9 | [ignore] はじめてのSpring Integration      |              9 |
| 10 | JDBCでデータベースポーリング               |              1 |
+----+--------------------------------------------+----------------+
10 rows in set (0.00 sec)

mysql> select * from processed_message_log;
+--------+------------------------------------------+
| log_id | message_text                             |
+--------+------------------------------------------+
|      1 | Hello World!!                            |
|      2 | Hello Spring Integration                 |
|      3 | Hello Spring Boot                        |
|      4 | Spring Integration JDBC Support          |
|      5 | Hello MySQL Connector/J                  |
|      6 | JDBCでデータベースポーリング             |
+--------+------------------------------------------+
6 rows in set (0.00 sec)

これでOutput Channel Adapterの方も確認できました。

まとめ

Spring IntegrationのJDBCサポートを使って、データベースポーリングを試してみました。

JDBCサポートに関する機能を、Java DSLで表現するところでやや手こずりましたが、それがわかるとけっこうあっさりと動かせましたね。

だんだん、慣れてきた気がします。

最後に、フロー定義していたクラスの全体を載せておきます。Output Channel Adapterを有効にしているバージョンにしておきます。

src/main/java/org/littlewings/spring/integration/JdbcPollingConfig.java

package org.littlewings.spring.integration;

import java.time.Duration;
import java.util.List;
import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.messaging.MessageHandler;

@Configuration
public class JdbcPollingConfig {
    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

    // @Bean
    public IntegrationFlow stdout() {
        Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class);

        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );
                })
                .get();
    }

    @Bean
    public MessageHandler jdbcMessageHandler(DataSource dataSource) {
        JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(
                dataSource,
                """
                        insert into processed_message_log(message_text)
                        values(:payload.messageText)
                        """
        );
        return jdbcMessageHandler;
    }

    @Bean
    public IntegrationFlow writeTable() {
        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(jdbcMessageHandler(null))
                .get();
    }
}