CLOVER🍀

That was when it all began.

Spring Batch × Spring Integration(Spring Batch Integration)を試す

これは、なにをしたくて書いたもの?

Spring Batchのドキュメントを見ていて、Spring Integrationと組み合わせられそうだったので、ちょっと試してみようかなと。

Spring Batch Integration

Spring Batchと、Spring Integrationの組み合わせについては、こちらに記載があります。

Spring Batch Integration

Spring Batch Integrationと呼ぶようです。

Spring Batchのユーザー、Spring Integrationのユーザーともに、双方を合わせて使うことで要件をより効率的に実現できるケースに
遭遇する可能性がある、という話のようです。

Many users of Spring Batch may encounter requirements that are outside the scope of Spring Batch but that may be efficiently and concisely implemented by using Spring Integration. Conversely, Spring Integration users may encounter Spring Batch requirements and need a way to efficiently integrate both frameworks.

Spring Batch Integration / Spring Batch Integration Introduction

バッチプロセスにメッセージングを追加することで、オペレーションの自動化と、キーとなる懸念事項の分離が可能になります。

Adding messaging to a batch process enables automation of operations and also separation and strategizing of key concerns.

ざっくり言うと、メッセージングの仕組みからSpring BatchのJobを起動するためのもの、みたいです。
また、メッセージングの仕組みをジョブに埋め込み、ワークロードを複数のワーカー(リモートパーティショニング、リモートチャンク)に
分散したりもできるようです。

メッセージングを使ったSpring Batchの起動について。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages

Spring BatchをCommandLineJobRunnerを使って起動したり、WebアプリケーションにおいてJobOperatorを直接扱う使い方も
ありますが、より複雑なユースケースについて挙げてみます。

バッチジョブのデータを取得するために、リモートのFTPやSFTPサーバーへのポーリングを行う必要があったり、複数の異なる
データソースをサポートする必要があるかもしれません。

Maybe you need to poll a remote (S)FTP server to retrieve the data for the Batch Job or your application has to support multiple different data sources simultaneously.

Webだけでなく、FTPなどからソースとなるデータファイルを受け取るかもしれません。Spring Batchを呼び出す前に、入力ファイルの
変換が必要になる場合もあるかもしれません。

For example, you may receive data files not only from the web, but also from FTP and other sources. Maybe additional transformation of the input files is needed before invoking Spring Batch.

このような場合は、Spring Integrationとその多数のアダプターを使ってバッチジョブを実行すると、より強力な仕組みになるでしょう。

Therefore, it would be much more powerful to execute the batch job using Spring Integration and its numerous adapters.

たとえば、File Inbound Channel Adapterを使用してファイルシステム内のディレクトリを監視し、入力ファイルが到着したらすぐに
バッチジョブを開始することができます。さらに、複数の異なるアダプターを使用してSpring Integrationのフローを作成し、
Configurationのみで複数のソースから同時にバッチジョブのデータを取り込むこともできます。

For example, you can use a File Inbound Channel Adapter to monitor a directory in the file-system and start the Batch Job as soon as the input file arrives. Additionally, you can create Spring Integration flows that use multiple different adapters to easily ingest data for your batch jobs from multiple sources simultaneously using only configuration.

この仕組みを実現するために、Spring Batch Integrationでは以下を提供します。

  • バッチジョブの起動を行うJobLaunchingMessageHandler
  • JobLaunchingMessageHandlerの入力となるペイロードを持つJobLaunchRequest

JobLaunchRequestは、バッチジョブを起動するのに必要なJobParametersJobのラッパーです。

あとは、Spring IntegrationのMessageからJobLaunchRequestへ変換する方法、

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Transforming a file into a JobLaunchRequest

JobExecutionについて(ジョブのリポジトリを参照してステータスを確認すること)、

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / The JobExecution Response

Spring Batch Integrationの設定例、

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Spring Batch Integration Configuration

ジョブの中で使われるItemReaderの設定例といったものが続きます。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Example ItemReader Configuration

と、ドキュメントだけを眺めていてもわからないので、実際に使っていってみましょう。

お題

今回のお題は、このようにします。

  • 特定のディレクトリをポーリングして、CSVファイルの配置を監視
  • CSVファイルが配置されたら、Spring Batchを起動
  • Spring Batchのジョブは、CSVファイルを読み込むItemReaderと、読み込んだデータをデータベースに反映するItemWriterで構成
    • データベースへの反映にはJPAを使用
  • 同じファイルを置いても、ジョブを起動して上書き更新する
    • 上書きしたことがわかるように、データベース上のテーブルには更新時間を設ける

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-110-generic", arch: "amd64", family: "unix"

データベースには、MySQLを使用します。MySQLは172.17.0.2で動作しているものとします。

$ mysql --version
mysql  Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)

プロジェクトを作成する

まずは、Spring Bootプロジェクトを作成します。依存関係にはbatchintegrationdata-jpamysqlを指定。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=batch-integration-example \
  -d groupId=org.littlewings \
  -d artifactId=batch-integration-example \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.batch.integration \
  -d dependencies=batch,integration,data-jpa,mysql \
  -d baseDir=batch-integration-example | tar zxvf -

プロジェクト内へ移動。

$ cd batch-integration-example

Maven依存関係およびプラグイン設定。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-batch</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-data-jpa</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-integration</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-jpa</artifactId>
                </dependency>

                <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.batch</groupId>
                        <artifactId>spring-batch-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

生成されたソースコードは削除しておきます。

$ rm src/main/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplication.java src/test/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplicationTests.java

今回使いたい依存ライブラリが足りないのと、使わないものもあるので少し変更。

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <!--
       <dependency>
           <groupId>org.springframework.integration</groupId>
           <artifactId>spring-integration-jpa</artifactId>
       </dependency>
       -->
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

まず、今回の主題であるspring-batch-integrationを追加、それからSpring Integrationを使ったファイル監視のためにspring-integration-fileを追加。
そして、spring-integration-jpaは使わないのでコメントアウトしておきます。

テーブル定義は、このようにしておきます。

src/main/resources/schema.sql

drop table if exists person;

create table person (
  id integer,
  last_name varchar(10),
  first_name varchar(10),
  age integer,
  updated_time datetime,
  primary key(id)
);

アプリケーションの設定はこちら。

src/main/resources/application.properties

spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8
spring.datasource.username=kazuhira
spring.datasource.password=password

spring.sql.init.mode=always
spring.batch.jdbc.initialize-schema=always

spring.batch.job.enabled=false

schema.sqlは常に適用するようにして、Spring Batchが使用するテーブルも作成するようにします。

もうひとつポイントがあるのですが、それはまた説明します。

アプリケーションを作成する

では、アプリケーションを作成していきましょう。

まずはmainクラスの作成。

src/main/java/org/littlewings/spring/batch/integration/App.java

package org.littlewings.spring.batch.integration;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

@EnableBatchProcessingアノテーションは必要です。Spring BatchのJobBuilderFactoryStepBuilderFactoryなどが
AutoConfigurationされるようにする必要があるので。

JPAのエンティティクラス。

src/main/java/org/littlewings/spring/batch/integration/Person.java

package org.littlewings.spring.batch.integration;

import java.time.LocalDateTime;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "person")
public class Person {
    @Id
    @Column(name = "id")
    Integer id;

    @Column(name = "last_name")
    String lastName;

    @Column(name = "first_name")
    String firstName;

    @Column(name = "age")
    Integer age;

    @Column(name = "updated_time")
    LocalDateTime updatedTime;

    // getter/setterは省略
}

DDLには存在しなかったupdatedTimeというフィールドがありますが、こちらはItemProcessorで設定することにします。

ここからは、こちらを見ながらSpring IntegrationやSpring Batchの設定を行っていきます。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Spring Batch Integration Configuration

最初は@Configurationを付与したクラスの雛形を作成。

src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java

package org.littlewings.spring.batch.integration;

import java.io.File;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
import javax.persistence.EntityManagerFactory;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.integration.launch.JobLaunchingGateway;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.handler.LoggingHandler;

@Configuration
public class IntegrationJobConfig {

    // 後で
}

CSVファイルの監視から書いていきましょう。Spring IntegrationのFile Supportを使います。

File Support

    @Bean
    public IntegrationFlow integrationFlow() throws Exception {
        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )
                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();
    }

target/filesディレクトリ配下の.csv拡張子のファイルを、1秒おきに監視します。また、1回で取得するファイルはひとつにしています。

        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )

ファイルを検出したら、transformした後に処理を行います(handle)。各ステップの間にはログ出力を行うようにしましょう。
logメソッドの第2引数はロガー名です。

                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();

transformメソッドに渡しているのは、Spring IntegrationのMessageをSpring BatchのJobに変換するTransformerです。

ですが、その前にhandleメソッドに渡している処理から見ていきましょう。

こちらでは、JobLaunchingGatewayを作成しています。

    @Bean
    public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SyncTaskExecutor());
        jobLauncher.afterPropertiesSet();

        return new JobLaunchingGateway(jobLauncher);
    }

JobLaunchingGatewayは、Spring BatchのJobを実行するためのMessageHandlerインターフェースの実装です。

JobLaunchingGateway (Spring Batch 4.3.5 API)

このため、JobLauncherインスタンスを必要とします。

Configuring and Running a Job / Configuring a JobLauncher

ここから先は、Spring BatchのJobを構成していきます。

Jobの定義。同じJobParametersの指定でも複数回起動できるように、RunIdIncrementerを指定しています。

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

Stepは2つ設けました。

ひとつ目のStepは、ファイルを読み込んでデータベースに書き込むものです。

    @Bean
    public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("loadFileToDatabaseStep")
                .<Person, Person>chunk(3)
                .reader(flatFilePersonItemReader(null))
                .processor(updateTimeProcessor())
                .writer(jpaPersonItemWriter(null))
                .build();
    }

ItemReaderItemProcessorItemWriterの定義はこちら。
CSVファイルを読み込む → エンティティにupdatedTimeを設定する → JPAでデータベースに書き込む、という流れになっています。

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
        Resource resource = new PathResource(path);

        return new FlatFileItemReaderBuilder<Person>()
                .name("flatFilePersonItemReader")
                .resource(resource)
                .encoding("UTF-8")
                .delimited()
                .names(new String[]{"id", "lastName", "firstName", "age"})
                .linesToSkip(1)
                .targetType(Person.class)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Person, Person> updateTimeProcessor() {
        return (person) -> {
            person.setUpdatedTime(LocalDateTime.now());
            return person;
        };
    }

    @Bean
    @StepScope
    public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<Person>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

入力となるCSVファイルのパスは、JobParametersとして取得します。

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {

最後は、読み込んだファイルを削除するStepです。ファイルを削除する処理を入れないと、監視対象のディレクトリにファイルが
残り続けるので、データの取り込み処理を延々と繰り返してしまいます。

    @Bean
    public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("deleteFileStep")
                .tasklet(deleteFileTasklet(null))
                .build();
    }

ファイルの削除を行うTasklet。このようなStepではなく違う方法でもファイルの削除はできると思うのですが、どこかしらに削除の
処理を書かないといけなさそうなことは変わらないので、今回はこの方法を選択しました。

    @Bean
    @StepScope
    public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) {
        return (contribution, context) -> {
            java.nio.file.Files.delete(Paths.get(path));
            return RepeatStatus.FINISHED;
        };
    }

そして、このSpring Batchで定義したJob

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

Jobを実行するためにSpring IntegrationのMessageJobに変換するのが、Transformerでした。

    public IntegrationFlow integrationFlow() throws Exception {
        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )
                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();

その処理の内容は、こちら。

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
        fileMessageToJobRequest.setJobExplorer(jobExplorer);

        return fileMessageToJobRequest;
    }

参考にしているドキュメントは、こちらですね。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / The JobExecution Response

JobおよびJobExplorerを設定しています。

        fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
        fileMessageToJobRequest.setJobExplorer(jobExplorer);

クラスの定義は、こちら。

src/main/java/org/littlewings/spring/batch/integration/FileMessageToJobRequest.java

package org.littlewings.spring.batch.integration;

import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

public class FileMessageToJobRequest {
    Job job;
    JobExplorer jobExplorer;

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer);  // for JobParametersIncrementer

        jobParametersBuilder.getNextJobParameters(job)  // for JobParametersIncrementer
                .addString("input.file.path", message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

    public Job getJob() {
        return job;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    public JobExplorer getJobExplorer() {
        return jobExplorer;
    }

    public void setJobExplorer(JobExplorer jobExplorer) {
        this.jobExplorer = jobExplorer;
    }
}

ポイントは、こちらのメソッドです。@Transformerアノテーションを付与して、Spring IntegrationのFile Supportを使って検出した
CSVファイル(Message<File>)をJobLaunchRequestに変換します。

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer);  // for JobParametersIncrementer

        jobParametersBuilder.getNextJobParameters(job)  // for JobParametersIncrementer
                .addString("input.file.path", message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

CSVファイルのパスは、この部分でJobParametersとして設定しています。

                .addString("input.file.path", message.getPayload().getAbsolutePath());

ここでの指定が、こちらに渡されるわけですね。

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {

ところでサンプルコードを見ると、JobExplorerは指定していないのですが。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / The JobExecution Response

今回JobParametersIncrementerRunIdIncrementer)を使ったのでJobParametersBuilderJobExplorerの設定および、
JobParametersBuilder#getNextJobParameters`の呼び出しが必要になります。

もし、RunIdIncrementerなしでできる限り同じJobParametersで実行しようとすると、以下のようにJobの実行の度に異なる
JobParamtersを設定するようなことをする必要が出てくるでしょうね。

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder
                .addString("input.file.path", message.getPayload().getAbsolutePath())
                .addString("job.start.time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss")));

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

これで、Spring BatchおよびSpring Integrationの設定はできました。

Spring BatchのJobおよびSpring IntegrationのIntegrationFlowを定義しているクラス全体を載せると、こんな感じになっています。

src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java

package org.littlewings.spring.batch.integration;

import java.io.File;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
import javax.persistence.EntityManagerFactory;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.integration.launch.JobLaunchingGateway;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.handler.LoggingHandler;

@Configuration
public class IntegrationJobConfig {
    @Bean
    public IntegrationFlow integrationFlow() throws Exception {
        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )
                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
        fileMessageToJobRequest.setJobExplorer(jobExplorer);

        return fileMessageToJobRequest;
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SyncTaskExecutor());
        jobLauncher.afterPropertiesSet();

        return new JobLaunchingGateway(jobLauncher);
    }

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

    @Bean
    public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("loadFileToDatabaseStep")
                .<Person, Person>chunk(3)
                .reader(flatFilePersonItemReader(null))
                .processor(updateTimeProcessor())
                .writer(jpaPersonItemWriter(null))
                .build();
    }

    @Bean
    public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("deleteFileStep")
                .tasklet(deleteFileTasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
        Resource resource = new PathResource(path);

        return new FlatFileItemReaderBuilder<Person>()
                .name("flatFilePersonItemReader")
                .resource(resource)
                .encoding("UTF-8")
                .delimited()
                .names(new String[]{"id", "lastName", "firstName", "age"})
                .linesToSkip(1)
                .targetType(Person.class)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Person, Person> updateTimeProcessor() {
        return (person) -> {
            person.setUpdatedTime(LocalDateTime.now());
            return person;
        };
    }

    @Bean
    @StepScope
    public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<Person>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

    @Bean
    @StepScope
    public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) {
        return (contribution, context) -> {
            java.nio.file.Files.delete(Paths.get(path));
            return RepeatStatus.FINISHED;
        };
    }
}

動作確認してみる

では、作成したアプリケーションを動かしてみましょう。

読み込むCSVファイルを、2つ用意します。

src/main/resources/isono_family.csv

id,last_name,first_name,age
1,磯野,カツオ,11
2,磯野,ワカメ,9
3,フグ田,サザエ,23
4,フグ田,マスオ,32
5,フグ田,タラオ,3

src/main/resources/namino_family.csv

id,last_name,first_name,age
6,波野,ノリスケ,24
7,波野,タイコ,22
8,波野,イクラ,1

なんとなくsrc/main/resourcesに置いているのでクラスパス上に存在していますが、クラスパスを監視したいわけではありません。

パッケージングして

$ mvn package

起動。

$ java -jar target/batch-integration-example-0.0.1-SNAPSHOT.jar

Webアプリケーションではないのですが、このままサーバーとして起動してくれています。

この時、src/main/resources/application.propertiesに以下の設定を入れていました。

spring.batch.job.enabled=false

これを入れていない場合は、Spring Batchのデフォルトの動作としてすべてのJobをしようとするので注意が必要です。 今回はSpring Batch Integrationを使ったJobしかなく、かつディレクトリ監視でJobを起動したいのでspring.batch.job.enabledfalseとして
アプリケーション起動時にJobを実行しないようにしました。

なお、ディレクトリ監視を行うと、監視対象のディレクトリがない場合は作成しようとするみたいですね。

$ ll target/files
合計 8
drwxrwxr-x 2 xxxxx xxxxx 4096  5月 20 01:19 ./
drwxrwxr-x 9 xxxxx xxxxx 4096  5月 20 01:19 ../

最初にテーブルの状態を確認。

mysql> select * from person;
Empty set (0.00 sec)

1ファイル目を監視対象のディレクトリにコピーしてみます。

$ cp src/main/resources/isono_family.csv target/files

すると、アプリケーションが動き始めます。

2022-05-20 01:22:30.249  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}]
2022-05-20 01:22:30.249  INFO 18890 --- [   scheduling-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2022-05-20 01:22:30.307  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}]
2022-05-20 01:22:30.509  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}]
2022-05-20 01:22:30.668  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:22:30.975  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 306ms
2022-05-20 01:22:31.061  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:22:31.169  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 108ms
2022-05-20 01:22:31.224  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms
2022-05-20 01:22:31.231  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]

ログを見ると、ファイルを検出して

2022-05-20 01:22:30.249  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}]

Jobに変換。

2022-05-20 01:22:30.307  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}]

Spring BatchのJobを実行。

2022-05-20 01:22:30.668  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:22:30.975  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 306ms
2022-05-20 01:22:31.061  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:22:31.169  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 108ms
2022-05-20 01:22:31.224  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms

終了、という様子がわかります。

2022-05-20 01:22:31.231  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]

データを確認してみましょう。

mysql> select * from person;
+----+-----------+------------+------+---------------------+
| id | last_name | first_name | age  | updated_time        |
+----+-----------+------------+------+---------------------+
|  1 | 磯野      | カツオ     |   11 | 2022-05-20 01:22:31 |
|  2 | 磯野      | ワカメ     |    9 | 2022-05-20 01:22:31 |
|  3 | フグ田    | サザエ     |   23 | 2022-05-20 01:22:31 |
|  4 | フグ田    | マスオ     |   32 | 2022-05-20 01:22:31 |
|  5 | フグ田    | タラオ     |    3 | 2022-05-20 01:22:31 |
+----+-----------+------------+------+---------------------+
5 rows in set (0.00 sec)

ちゃんと取り込まれていますね。

監視対象のディレクトリに配置したファイルは、Spring BatchのJobに含まれていたTaskletにより削除されています。

$ tree target/files
target/files

0 directories, 0 files

もうひとつのファイルを、監視対象のディレクトリにコピーしてみます。

$ cp src/main/resources/namino_family.csv target/files

先ほどと同様、配置されたファイルを検出してSpring Batchが起動しました。

2022-05-20 01:31:28.237  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/namino_family.csv, headers={file_originalFile=target/files/namino_family.csv, id=2e9cac33-a5be-8eeb-a471-d47f579906cc, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888237}]
2022-05-20 01:31:28.278  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}, headers={file_originalFile=target/files/namino_family.csv, id=c404ecd5-4b5a-2593-dc12-13b0621b3b54, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888278}]
2022-05-20 01:31:28.365  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}]
2022-05-20 01:31:28.448  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:31:28.581  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 132ms
2022-05-20 01:31:28.658  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:31:28.834  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 176ms
2022-05-20 01:31:28.939  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}] and the following status: [COMPLETED] in 538ms
2022-05-20 01:31:28.940  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=2, version=2, startTime=Fri May 20 01:31:28 JST 2022, endTime=Fri May 20 01:31:28 JST 2022, lastUpdated=Fri May 20 01:31:28 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=2, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}], headers={file_originalFile=target/files/namino_family.csv, id=f50295b0-dd2b-5d49-6196-d61c5507b3d5, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888940}]

取り込まれたデータ。

mysql> select * from person;
+----+-----------+--------------+------+---------------------+
| id | last_name | first_name   | age  | updated_time        |
+----+-----------+--------------+------+---------------------+
|  1 | 磯野      | カツオ       |   11 | 2022-05-20 01:22:31 |
|  2 | 磯野      | ワカメ       |    9 | 2022-05-20 01:22:31 |
|  3 | フグ田    | サザエ       |   23 | 2022-05-20 01:22:31 |
|  4 | フグ田    | マスオ       |   32 | 2022-05-20 01:22:31 |
|  5 | フグ田    | タラオ       |    3 | 2022-05-20 01:22:31 |
|  6 | 波野      | ノリスケ     |   24 | 2022-05-20 01:31:29 |
|  7 | 波野      | タイコ       |   22 | 2022-05-20 01:31:29 |
|  8 | 波野      | イクラ       |    1 | 2022-05-20 01:31:29 |
+----+-----------+--------------+------+---------------------+
8 rows in set (0.00 sec)

ここで、もう1度最初に取り込んだファイルをコピーしてみます。

$ cp src/main/resources/isono_family.csv target/files

最初と同じように、Spring Batchが起動します。

2022-05-20 01:33:40.237  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ef090587-8d4d-0932-7b89-62d2418d8391, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020237}]
2022-05-20 01:33:40.252  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}]
2022-05-20 01:33:40.320  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}]
2022-05-20 01:33:40.397  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:33:40.504  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 107ms
2022-05-20 01:33:40.571  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:33:40.631  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 60ms
2022-05-20 01:33:40.691  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 348ms
2022-05-20 01:33:40.692  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=3, version=2, startTime=Fri May 20 01:33:40 JST 2022, endTime=Fri May 20 01:33:40 JST 2022, lastUpdated=Fri May 20 01:33:40 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=3, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=23cb8db4-88cd-5bfb-6557-aca9275a8b1f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020691}]

Jobの構成にRunIdIncrementerを含めているので、run.idがインクリメントされているので与えるファイルパスが同じでも問題なく
起動します。

2022-05-20 01:33:40.252  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}]

RunIdIncrementerを含めずに同じJobParametersJobを起動した場合は、Spring Batchの実行に失敗することになります。

同じデータのupdated_timeが更新されていることも確認しておきます。

mysql> select * from person;
+----+-----------+--------------+------+---------------------+
| id | last_name | first_name   | age  | updated_time        |
+----+-----------+--------------+------+---------------------+
|  1 | 磯野      | カツオ       |   11 | 2022-05-20 01:33:40 |
|  2 | 磯野      | ワカメ       |    9 | 2022-05-20 01:33:40 |
|  3 | フグ田    | サザエ       |   23 | 2022-05-20 01:33:40 |
|  4 | フグ田    | マスオ       |   32 | 2022-05-20 01:33:40 |
|  5 | フグ田    | タラオ       |    3 | 2022-05-20 01:33:40 |
|  6 | 波野      | ノリスケ     |   24 | 2022-05-20 01:31:29 |
|  7 | 波野      | タイコ       |   22 | 2022-05-20 01:31:29 |
|  8 | 波野      | イクラ       |    1 | 2022-05-20 01:31:29 |
+----+-----------+--------------+------+---------------------+
8 rows in set (0.00 sec)

OKですね。

これで、Spring BatchとSpring Integrationの組み合わせで動作確認できました。

オマケ:RunIdIncrementerの代わりにJsrJobParametersConverterを使った場合は?

今回、RunIdIncrementerを使用しましたがJsrJobParametersConverterを使ってもよいのでは?という疑問を持ったりはします。

RunIdIncrementerコメントアウトして

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                // .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

JsrJobParametersConverterをBeanとして定義してもよいのではないのでしょうか。

    @Bean
    public JsrJobParametersConverter jsrJobParametersConverter(DataSource dataSource) {
        return new JsrJobParametersConverter(dataSource);
    }

これは、うまくいきません。

この定義方法でJsrJobParametersConverterが使われるのは、Spring BatchをJobLauncherApplicationRunnerを使って起動した場合ですね。

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java

もしくは、JsrJobOperatorを使ってもよさそうではあります。

https://github.com/spring-projects/spring-batch/blob/4.3.5/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/launch/JsrJobOperator.java

Spring Batch Integrationを介して実行できなさそうではありますが…。

まとめ

Spring BatchとSpring Integrationを組み合わせた、Spring Batch Integrationを試してみました。

ドキュメントが少ないのでSpring BatchとSpring Integrationのつなぎ込みはやや苦労しましたが、そこをクリアすればSpring Batchと
Spring Integrationそれぞれに知識があれば簡単に使える感じがしましたね。

Spring IntegrationのInbound Channel Adapterを介して取得するMessageの単位がSpring BatchのJobの起動単位になるので、
その粒度で大丈夫ならSpring Integrationの機能が使えるのでSpring Batchを使った表現範囲が大きくなるのはよくわかりました。

覚えておくとよさそうですね。