これは、なにをしたくて書いたもの?
Spring Batchのドキュメントを見ていて、Spring Integrationと組み合わせられそうだったので、ちょっと試してみようかなと。
Spring Batch Integration
Spring Batchと、Spring Integrationの組み合わせについては、こちらに記載があります。
Spring Batch Integrationと呼ぶようです。
Spring Batchのユーザー、Spring Integrationのユーザーともに、双方を合わせて使うことで要件をより効率的に実現できるケースに
遭遇する可能性がある、という話のようです。
Many users of Spring Batch may encounter requirements that are outside the scope of Spring Batch but that may be efficiently and concisely implemented by using Spring Integration. Conversely, Spring Integration users may encounter Spring Batch requirements and need a way to efficiently integrate both frameworks.
Spring Batch Integration / Spring Batch Integration Introduction
バッチプロセスにメッセージングを追加することで、オペレーションの自動化と、キーとなる懸念事項の分離が可能になります。
Adding messaging to a batch process enables automation of operations and also separation and strategizing of key concerns.
ざっくり言うと、メッセージングの仕組みからSpring BatchのJob
を起動するためのもの、みたいです。
また、メッセージングの仕組みをジョブに埋め込み、ワークロードを複数のワーカー(リモートパーティショニング、リモートチャンク)に
分散したりもできるようです。
メッセージングを使ったSpring Batchの起動について。
Spring BatchをCommandLineJobRunner
を使って起動したり、WebアプリケーションにおいてJobOperator
を直接扱う使い方も
ありますが、より複雑なユースケースについて挙げてみます。
バッチジョブのデータを取得するために、リモートのFTPやSFTPサーバーへのポーリングを行う必要があったり、複数の異なる
データソースをサポートする必要があるかもしれません。
Maybe you need to poll a remote (S)FTP server to retrieve the data for the Batch Job or your application has to support multiple different data sources simultaneously.
Webだけでなく、FTPなどからソースとなるデータファイルを受け取るかもしれません。Spring Batchを呼び出す前に、入力ファイルの
変換が必要になる場合もあるかもしれません。
For example, you may receive data files not only from the web, but also from FTP and other sources. Maybe additional transformation of the input files is needed before invoking Spring Batch.
このような場合は、Spring Integrationとその多数のアダプターを使ってバッチジョブを実行すると、より強力な仕組みになるでしょう。
Therefore, it would be much more powerful to execute the batch job using Spring Integration and its numerous adapters.
たとえば、File Inbound Channel Adapterを使用してファイルシステム内のディレクトリを監視し、入力ファイルが到着したらすぐに
バッチジョブを開始することができます。さらに、複数の異なるアダプターを使用してSpring Integrationのフローを作成し、
Configurationのみで複数のソースから同時にバッチジョブのデータを取り込むこともできます。
For example, you can use a File Inbound Channel Adapter to monitor a directory in the file-system and start the Batch Job as soon as the input file arrives. Additionally, you can create Spring Integration flows that use multiple different adapters to easily ingest data for your batch jobs from multiple sources simultaneously using only configuration.
この仕組みを実現するために、Spring Batch Integrationでは以下を提供します。
- バッチジョブの起動を行う
JobLaunchingMessageHandler
JobLaunchingMessageHandler
の入力となるペイロードを持つJobLaunchRequest
JobLaunchRequest
は、バッチジョブを起動するのに必要なJobParameters
とJob
のラッパーです。
あとは、Spring IntegrationのMessage
からJobLaunchRequest
へ変換する方法、
JobExecution
について(ジョブのリポジトリを参照してステータスを確認すること)、
Spring Batch Integrationの設定例、
ジョブの中で使われるItemReader
の設定例といったものが続きます。
と、ドキュメントだけを眺めていてもわからないので、実際に使っていってみましょう。
お題
今回のお題は、このようにします。
- 特定のディレクトリをポーリングして、CSVファイルの配置を監視
- CSVファイルが配置されたら、Spring Batchを起動
- Spring Batchのジョブは、CSVファイルを読み込む
ItemReader
と、読み込んだデータをデータベースに反映するItemWriter
で構成- データベースへの反映にはJPAを使用
- 同じファイルを置いても、ジョブを起動して上書き更新する
- 上書きしたことがわかるように、データベース上のテーブルには更新時間を設ける
環境
今回の環境は、こちら。
$ java --version openjdk 17.0.3 2022-04-19 OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1) OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing) $ mvn --version Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-110-generic", arch: "amd64", family: "unix"
データベースには、MySQLを使用します。MySQLは172.17.0.2で動作しているものとします。
$ mysql --version mysql Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)
プロジェクトを作成する
まずは、Spring Bootプロジェクトを作成します。依存関係にはbatch
、integration
、data-jpa
、mysql
を指定。
$ curl -s https://start.spring.io/starter.tgz \ -d bootVersion=2.6.7 \ -d javaVersion=17 \ -d name=batch-integration-example \ -d groupId=org.littlewings \ -d artifactId=batch-integration-example \ -d version=0.0.1-SNAPSHOT \ -d packageName=org.littlewings.spring.batch.integration \ -d dependencies=batch,integration,data-jpa,mysql \ -d baseDir=batch-integration-example | tar zxvf -
プロジェクト内へ移動。
$ cd batch-integration-example
<properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
生成されたソースコードは削除しておきます。
$ rm src/main/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplication.java src/test/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplicationTests.java
今回使いたい依存ライブラリが足りないのと、使わないものもあるので少し変更。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <!-- <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jpa</artifactId> </dependency> --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> </dependencies>
まず、今回の主題であるspring-batch-integration
を追加、それからSpring Integrationを使ったファイル監視のためにspring-integration-file
を追加。
そして、spring-integration-jpa
は使わないのでコメントアウトしておきます。
テーブル定義は、このようにしておきます。
src/main/resources/schema.sql
drop table if exists person; create table person ( id integer, last_name varchar(10), first_name varchar(10), age integer, updated_time datetime, primary key(id) );
アプリケーションの設定はこちら。
src/main/resources/application.properties
spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8 spring.datasource.username=kazuhira spring.datasource.password=password spring.sql.init.mode=always spring.batch.jdbc.initialize-schema=always spring.batch.job.enabled=false
schema.sql
は常に適用するようにして、Spring Batchが使用するテーブルも作成するようにします。
もうひとつポイントがあるのですが、それはまた説明します。
アプリケーションを作成する
では、アプリケーションを作成していきましょう。
まずはmain
クラスの作成。
src/main/java/org/littlewings/spring/batch/integration/App.java
package org.littlewings.spring.batch.integration; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing public class App { public static void main(String... args) { SpringApplication.run(App.class, args); } }
@EnableBatchProcessing
アノテーションは必要です。Spring BatchのJobBuilderFactory
やStepBuilderFactory
などが
AutoConfigurationされるようにする必要があるので。
JPAのエンティティクラス。
src/main/java/org/littlewings/spring/batch/integration/Person.java
package org.littlewings.spring.batch.integration; import java.time.LocalDateTime; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table(name = "person") public class Person { @Id @Column(name = "id") Integer id; @Column(name = "last_name") String lastName; @Column(name = "first_name") String firstName; @Column(name = "age") Integer age; @Column(name = "updated_time") LocalDateTime updatedTime; // getter/setterは省略 }
DDLには存在しなかったupdatedTime
というフィールドがありますが、こちらはItemProcessor
で設定することにします。
ここからは、こちらを見ながらSpring IntegrationやSpring Batchの設定を行っていきます。
最初は@Configuration
を付与したクラスの雛形を作成。
src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java
package org.littlewings.spring.batch.integration; import java.io.File; import java.nio.file.Paths; import java.time.Duration; import java.time.LocalDateTime; import javax.persistence.EntityManagerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.integration.launch.JobLaunchingGateway; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.database.JpaItemWriter; import org.springframework.batch.item.database.builder.JpaItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.PathResource; import org.springframework.core.io.Resource; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.file.dsl.Files; import org.springframework.integration.file.filters.SimplePatternFileListFilter; import org.springframework.integration.handler.LoggingHandler; @Configuration public class IntegrationJobConfig { // 後で }
CSVファイルの監視から書いていきましょう。Spring IntegrationのFile Supportを使います。
@Bean public IntegrationFlow integrationFlow() throws Exception { return IntegrationFlows .from( Files .inboundAdapter(new File("target/files")) .filter(new SimplePatternFileListFilter("*.csv")), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L)) ) .log(LoggingHandler.Level.INFO, "polling file") .transform(fileMessageToJobRequest(null)) .log(LoggingHandler.Level.INFO, "transform job request") .handle(jobLaunchingGateway(null)) .log(LoggingHandler.Level.INFO, "job execution result") .get(); }
target/files
ディレクトリ配下の.csv
拡張子のファイルを、1秒おきに監視します。また、1回で取得するファイルはひとつにしています。
return IntegrationFlows .from( Files .inboundAdapter(new File("target/files")) .filter(new SimplePatternFileListFilter("*.csv")), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L)) )
ファイルを検出したら、transform
した後に処理を行います(handle
)。各ステップの間にはログ出力を行うようにしましょう。
log
メソッドの第2引数はロガー名です。
.log(LoggingHandler.Level.INFO, "polling file") .transform(fileMessageToJobRequest(null)) .log(LoggingHandler.Level.INFO, "transform job request") .handle(jobLaunchingGateway(null)) .log(LoggingHandler.Level.INFO, "job execution result") .get();
transform
メソッドに渡しているのは、Spring IntegrationのMessage
をSpring BatchのJob
に変換するTransformer
です。
ですが、その前にhandle
メソッドに渡している処理から見ていきましょう。
こちらでは、JobLaunchingGateway
を作成しています。
@Bean public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository); jobLauncher.setTaskExecutor(new SyncTaskExecutor()); jobLauncher.afterPropertiesSet(); return new JobLaunchingGateway(jobLauncher); }
JobLaunchingGateway
は、Spring BatchのJob
を実行するためのMessageHandler
インターフェースの実装です。
JobLaunchingGateway (Spring Batch 4.3.5 API)
このため、JobLauncher
のインスタンスを必要とします。
Configuring and Running a Job / Configuring a JobLauncher
ここから先は、Spring BatchのJob
を構成していきます。
Job
の定義。同じJobParameters
の指定でも複数回起動できるように、RunIdIncrementer
を指定しています。
@Bean public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) { return jobBuilderFactory .get("loadFileToDatabaseJob") .incrementer(new RunIdIncrementer()) .start(loadFileToDatabaseStep(null)) .next(deleteFileStep(null)) .build(); }
Step
は2つ設けました。
ひとつ目のStep
は、ファイルを読み込んでデータベースに書き込むものです。
@Bean public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory .get("loadFileToDatabaseStep") .<Person, Person>chunk(3) .reader(flatFilePersonItemReader(null)) .processor(updateTimeProcessor()) .writer(jpaPersonItemWriter(null)) .build(); }
ItemReader
、ItemProcessor
、ItemWriter
の定義はこちら。
CSVファイルを読み込む → エンティティにupdatedTime
を設定する → JPAでデータベースに書き込む、という流れになっています。
@Bean @StepScope public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) { Resource resource = new PathResource(path); return new FlatFileItemReaderBuilder<Person>() .name("flatFilePersonItemReader") .resource(resource) .encoding("UTF-8") .delimited() .names(new String[]{"id", "lastName", "firstName", "age"}) .linesToSkip(1) .targetType(Person.class) .build(); } @Bean @StepScope public ItemProcessor<Person, Person> updateTimeProcessor() { return (person) -> { person.setUpdatedTime(LocalDateTime.now()); return person; }; } @Bean @StepScope public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) { return new JpaItemWriterBuilder<Person>() .entityManagerFactory(entityManagerFactory) .build(); }
入力となるCSVファイルのパスは、JobParameters
として取得します。
@Bean @StepScope public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
最後は、読み込んだファイルを削除するStep
です。ファイルを削除する処理を入れないと、監視対象のディレクトリにファイルが
残り続けるので、データの取り込み処理を延々と繰り返してしまいます。
@Bean public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory .get("deleteFileStep") .tasklet(deleteFileTasklet(null)) .build(); }
ファイルの削除を行うTasklet
。このようなStep
ではなく違う方法でもファイルの削除はできると思うのですが、どこかしらに削除の
処理を書かないといけなさそうなことは変わらないので、今回はこの方法を選択しました。
@Bean @StepScope public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) { return (contribution, context) -> { java.nio.file.Files.delete(Paths.get(path)); return RepeatStatus.FINISHED; }; }
そして、このSpring Batchで定義したJob
と
@Bean public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) { return jobBuilderFactory .get("loadFileToDatabaseJob") .incrementer(new RunIdIncrementer()) .start(loadFileToDatabaseStep(null)) .next(deleteFileStep(null)) .build(); }
Job
を実行するためにSpring IntegrationのMessage
をJob
に変換するのが、Transformer
でした。
public IntegrationFlow integrationFlow() throws Exception { return IntegrationFlows .from( Files .inboundAdapter(new File("target/files")) .filter(new SimplePatternFileListFilter("*.csv")), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L)) ) .log(LoggingHandler.Level.INFO, "polling file") .transform(fileMessageToJobRequest(null)) .log(LoggingHandler.Level.INFO, "transform job request") .handle(jobLaunchingGateway(null)) .log(LoggingHandler.Level.INFO, "job execution result") .get();
その処理の内容は、こちら。
@Bean public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) { FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(); fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null)); fileMessageToJobRequest.setJobExplorer(jobExplorer); return fileMessageToJobRequest; }
参考にしているドキュメントは、こちらですね。
Job
およびJobExplorer
を設定しています。
fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
fileMessageToJobRequest.setJobExplorer(jobExplorer);
クラスの定義は、こちら。
src/main/java/org/littlewings/spring/batch/integration/FileMessageToJobRequest.java
package org.littlewings.spring.batch.integration; import java.io.File; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.integration.launch.JobLaunchRequest; import org.springframework.format.annotation.DateTimeFormat; import org.springframework.integration.annotation.Transformer; import org.springframework.messaging.Message; public class FileMessageToJobRequest { Job job; JobExplorer jobExplorer; @Transformer public JobLaunchRequest toRequest(Message<File> message) { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer); // for JobParametersIncrementer jobParametersBuilder.getNextJobParameters(job) // for JobParametersIncrementer .addString("input.file.path", message.getPayload().getAbsolutePath()); return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters()); } public Job getJob() { return job; } public void setJob(Job job) { this.job = job; } public JobExplorer getJobExplorer() { return jobExplorer; } public void setJobExplorer(JobExplorer jobExplorer) { this.jobExplorer = jobExplorer; } }
ポイントは、こちらのメソッドです。@Transformer
アノテーションを付与して、Spring IntegrationのFile Supportを使って検出した
CSVファイル(Message<File>
)をJobLaunchRequest
に変換します。
@Transformer public JobLaunchRequest toRequest(Message<File> message) { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer); // for JobParametersIncrementer jobParametersBuilder.getNextJobParameters(job) // for JobParametersIncrementer .addString("input.file.path", message.getPayload().getAbsolutePath()); return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters()); }
CSVファイルのパスは、この部分でJobParameters
として設定しています。
.addString("input.file.path", message.getPayload().getAbsolutePath());
ここでの指定が、こちらに渡されるわけですね。
@Bean @StepScope public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
ところでサンプルコードを見ると、JobExplorer
は指定していないのですが。
今回JobParametersIncrementer
(RunIdIncrementer
)を使ったのでJobParametersBuilder
にJobExplorer
の設定および、
JobParametersBuilder#getNextJobParameters`の呼び出しが必要になります。
もし、RunIdIncrementer
なしでできる限り同じJobParameters
で実行しようとすると、以下のようにJob
の実行の度に異なる
JobParamters
を設定するようなことをする必要が出てくるでしょうね。
@Transformer public JobLaunchRequest toRequest(Message<File> message) { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder .addString("input.file.path", message.getPayload().getAbsolutePath()) .addString("job.start.time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss"))); return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters()); }
これで、Spring BatchおよびSpring Integrationの設定はできました。
Spring BatchのJob
およびSpring IntegrationのIntegrationFlow
を定義しているクラス全体を載せると、こんな感じになっています。
src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java
package org.littlewings.spring.batch.integration; import java.io.File; import java.nio.file.Paths; import java.time.Duration; import java.time.LocalDateTime; import javax.persistence.EntityManagerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.integration.launch.JobLaunchingGateway; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.database.JpaItemWriter; import org.springframework.batch.item.database.builder.JpaItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.PathResource; import org.springframework.core.io.Resource; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.file.dsl.Files; import org.springframework.integration.file.filters.SimplePatternFileListFilter; import org.springframework.integration.handler.LoggingHandler; @Configuration public class IntegrationJobConfig { @Bean public IntegrationFlow integrationFlow() throws Exception { return IntegrationFlows .from( Files .inboundAdapter(new File("target/files")) .filter(new SimplePatternFileListFilter("*.csv")), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L)) ) .log(LoggingHandler.Level.INFO, "polling file") .transform(fileMessageToJobRequest(null)) .log(LoggingHandler.Level.INFO, "transform job request") .handle(jobLaunchingGateway(null)) .log(LoggingHandler.Level.INFO, "job execution result") .get(); } @Bean public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) { FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(); fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null)); fileMessageToJobRequest.setJobExplorer(jobExplorer); return fileMessageToJobRequest; } @Bean public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository); jobLauncher.setTaskExecutor(new SyncTaskExecutor()); jobLauncher.afterPropertiesSet(); return new JobLaunchingGateway(jobLauncher); } @Bean public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) { return jobBuilderFactory .get("loadFileToDatabaseJob") .incrementer(new RunIdIncrementer()) .start(loadFileToDatabaseStep(null)) .next(deleteFileStep(null)) .build(); } @Bean public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory .get("loadFileToDatabaseStep") .<Person, Person>chunk(3) .reader(flatFilePersonItemReader(null)) .processor(updateTimeProcessor()) .writer(jpaPersonItemWriter(null)) .build(); } @Bean public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory .get("deleteFileStep") .tasklet(deleteFileTasklet(null)) .build(); } @Bean @StepScope public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) { Resource resource = new PathResource(path); return new FlatFileItemReaderBuilder<Person>() .name("flatFilePersonItemReader") .resource(resource) .encoding("UTF-8") .delimited() .names(new String[]{"id", "lastName", "firstName", "age"}) .linesToSkip(1) .targetType(Person.class) .build(); } @Bean @StepScope public ItemProcessor<Person, Person> updateTimeProcessor() { return (person) -> { person.setUpdatedTime(LocalDateTime.now()); return person; }; } @Bean @StepScope public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) { return new JpaItemWriterBuilder<Person>() .entityManagerFactory(entityManagerFactory) .build(); } @Bean @StepScope public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) { return (contribution, context) -> { java.nio.file.Files.delete(Paths.get(path)); return RepeatStatus.FINISHED; }; } }
動作確認してみる
では、作成したアプリケーションを動かしてみましょう。
読み込むCSVファイルを、2つ用意します。
src/main/resources/isono_family.csv
id,last_name,first_name,age 1,磯野,カツオ,11 2,磯野,ワカメ,9 3,フグ田,サザエ,23 4,フグ田,マスオ,32 5,フグ田,タラオ,3
src/main/resources/namino_family.csv
id,last_name,first_name,age 6,波野,ノリスケ,24 7,波野,タイコ,22 8,波野,イクラ,1
なんとなくsrc/main/resources
に置いているのでクラスパス上に存在していますが、クラスパスを監視したいわけではありません。
パッケージングして
$ mvn package
起動。
$ java -jar target/batch-integration-example-0.0.1-SNAPSHOT.jar
Webアプリケーションではないのですが、このままサーバーとして起動してくれています。
この時、src/main/resources/application.properties
に以下の設定を入れていました。
spring.batch.job.enabled=false
これを入れていない場合は、Spring Batchのデフォルトの動作としてすべてのJob
をしようとするので注意が必要です。
今回はSpring Batch Integrationを使ったJob
しかなく、かつディレクトリ監視でJob
を起動したいのでspring.batch.job.enabled
をfalse
として
アプリケーション起動時にJob
を実行しないようにしました。
なお、ディレクトリ監視を行うと、監視対象のディレクトリがない場合は作成しようとするみたいですね。
$ ll target/files 合計 8 drwxrwxr-x 2 xxxxx xxxxx 4096 5月 20 01:19 ./ drwxrwxr-x 9 xxxxx xxxxx 4096 5月 20 01:19 ../
最初にテーブルの状態を確認。
mysql> select * from person; Empty set (0.00 sec)
1ファイル目を監視対象のディレクトリにコピーしてみます。
$ cp src/main/resources/isono_family.csv target/files
すると、アプリケーションが動き始めます。
2022-05-20 01:22:30.249 INFO 18890 --- [ scheduling-1] polling file : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}] 2022-05-20 01:22:30.249 INFO 18890 --- [ scheduling-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one. 2022-05-20 01:22:30.307 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}] 2022-05-20 01:22:30.509 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] 2022-05-20 01:22:30.668 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadFileToDatabaseStep] 2022-05-20 01:22:30.975 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [loadFileToDatabaseStep] executed in 306ms 2022-05-20 01:22:31.061 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [deleteFileStep] 2022-05-20 01:22:31.169 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [deleteFileStep] executed in 108ms 2022-05-20 01:22:31.224 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms 2022-05-20 01:22:31.231 INFO 18890 --- [ scheduling-1] job execution result : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]
ログを見ると、ファイルを検出して
2022-05-20 01:22:30.249 INFO 18890 --- [ scheduling-1] polling file : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}]
Job
に変換。
2022-05-20 01:22:30.307 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}]
Spring BatchのJob
を実行。
2022-05-20 01:22:30.668 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadFileToDatabaseStep] 2022-05-20 01:22:30.975 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [loadFileToDatabaseStep] executed in 306ms 2022-05-20 01:22:31.061 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [deleteFileStep] 2022-05-20 01:22:31.169 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [deleteFileStep] executed in 108ms 2022-05-20 01:22:31.224 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms
終了、という様子がわかります。
2022-05-20 01:22:31.231 INFO 18890 --- [ scheduling-1] job execution result : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]
データを確認してみましょう。
mysql> select * from person; +----+-----------+------------+------+---------------------+ | id | last_name | first_name | age | updated_time | +----+-----------+------------+------+---------------------+ | 1 | 磯野 | カツオ | 11 | 2022-05-20 01:22:31 | | 2 | 磯野 | ワカメ | 9 | 2022-05-20 01:22:31 | | 3 | フグ田 | サザエ | 23 | 2022-05-20 01:22:31 | | 4 | フグ田 | マスオ | 32 | 2022-05-20 01:22:31 | | 5 | フグ田 | タラオ | 3 | 2022-05-20 01:22:31 | +----+-----------+------------+------+---------------------+ 5 rows in set (0.00 sec)
ちゃんと取り込まれていますね。
監視対象のディレクトリに配置したファイルは、Spring BatchのJob
に含まれていたTasklet
により削除されています。
$ tree target/files target/files 0 directories, 0 files
もうひとつのファイルを、監視対象のディレクトリにコピーしてみます。
$ cp src/main/resources/namino_family.csv target/files
先ほどと同様、配置されたファイルを検出してSpring Batchが起動しました。
2022-05-20 01:31:28.237 INFO 18890 --- [ scheduling-1] polling file : GenericMessage [payload=target/files/namino_family.csv, headers={file_originalFile=target/files/namino_family.csv, id=2e9cac33-a5be-8eeb-a471-d47f579906cc, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888237}] 2022-05-20 01:31:28.278 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}, headers={file_originalFile=target/files/namino_family.csv, id=c404ecd5-4b5a-2593-dc12-13b0621b3b54, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888278}] 2022-05-20 01:31:28.365 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}] 2022-05-20 01:31:28.448 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadFileToDatabaseStep] 2022-05-20 01:31:28.581 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [loadFileToDatabaseStep] executed in 132ms 2022-05-20 01:31:28.658 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [deleteFileStep] 2022-05-20 01:31:28.834 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [deleteFileStep] executed in 176ms 2022-05-20 01:31:28.939 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}] and the following status: [COMPLETED] in 538ms 2022-05-20 01:31:28.940 INFO 18890 --- [ scheduling-1] job execution result : GenericMessage [payload=JobExecution: id=2, version=2, startTime=Fri May 20 01:31:28 JST 2022, endTime=Fri May 20 01:31:28 JST 2022, lastUpdated=Fri May 20 01:31:28 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=2, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}], headers={file_originalFile=target/files/namino_family.csv, id=f50295b0-dd2b-5d49-6196-d61c5507b3d5, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888940}]
取り込まれたデータ。
mysql> select * from person; +----+-----------+--------------+------+---------------------+ | id | last_name | first_name | age | updated_time | +----+-----------+--------------+------+---------------------+ | 1 | 磯野 | カツオ | 11 | 2022-05-20 01:22:31 | | 2 | 磯野 | ワカメ | 9 | 2022-05-20 01:22:31 | | 3 | フグ田 | サザエ | 23 | 2022-05-20 01:22:31 | | 4 | フグ田 | マスオ | 32 | 2022-05-20 01:22:31 | | 5 | フグ田 | タラオ | 3 | 2022-05-20 01:22:31 | | 6 | 波野 | ノリスケ | 24 | 2022-05-20 01:31:29 | | 7 | 波野 | タイコ | 22 | 2022-05-20 01:31:29 | | 8 | 波野 | イクラ | 1 | 2022-05-20 01:31:29 | +----+-----------+--------------+------+---------------------+ 8 rows in set (0.00 sec)
ここで、もう1度最初に取り込んだファイルをコピーしてみます。
$ cp src/main/resources/isono_family.csv target/files
最初と同じように、Spring Batchが起動します。
2022-05-20 01:33:40.237 INFO 18890 --- [ scheduling-1] polling file : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ef090587-8d4d-0932-7b89-62d2418d8391, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020237}] 2022-05-20 01:33:40.252 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}] 2022-05-20 01:33:40.320 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] 2022-05-20 01:33:40.397 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadFileToDatabaseStep] 2022-05-20 01:33:40.504 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [loadFileToDatabaseStep] executed in 107ms 2022-05-20 01:33:40.571 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [deleteFileStep] 2022-05-20 01:33:40.631 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [deleteFileStep] executed in 60ms 2022-05-20 01:33:40.691 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 348ms 2022-05-20 01:33:40.692 INFO 18890 --- [ scheduling-1] job execution result : GenericMessage [payload=JobExecution: id=3, version=2, startTime=Fri May 20 01:33:40 JST 2022, endTime=Fri May 20 01:33:40 JST 2022, lastUpdated=Fri May 20 01:33:40 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=3, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=23cb8db4-88cd-5bfb-6557-aca9275a8b1f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020691}]
Job
の構成にRunIdIncrementer
を含めているので、run.id
がインクリメントされているので与えるファイルパスが同じでも問題なく
起動します。
2022-05-20 01:33:40.252 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}]
RunIdIncrementer
を含めずに同じJobParameters
でJob
を起動した場合は、Spring Batchの実行に失敗することになります。
同じデータのupdated_time
が更新されていることも確認しておきます。
mysql> select * from person; +----+-----------+--------------+------+---------------------+ | id | last_name | first_name | age | updated_time | +----+-----------+--------------+------+---------------------+ | 1 | 磯野 | カツオ | 11 | 2022-05-20 01:33:40 | | 2 | 磯野 | ワカメ | 9 | 2022-05-20 01:33:40 | | 3 | フグ田 | サザエ | 23 | 2022-05-20 01:33:40 | | 4 | フグ田 | マスオ | 32 | 2022-05-20 01:33:40 | | 5 | フグ田 | タラオ | 3 | 2022-05-20 01:33:40 | | 6 | 波野 | ノリスケ | 24 | 2022-05-20 01:31:29 | | 7 | 波野 | タイコ | 22 | 2022-05-20 01:31:29 | | 8 | 波野 | イクラ | 1 | 2022-05-20 01:31:29 | +----+-----------+--------------+------+---------------------+ 8 rows in set (0.00 sec)
OKですね。
これで、Spring BatchとSpring Integrationの組み合わせで動作確認できました。
オマケ:RunIdIncrementerの代わりにJsrJobParametersConverterを使った場合は?
今回、RunIdIncrementer
を使用しましたがJsrJobParametersConverter
を使ってもよいのでは?という疑問を持ったりはします。
RunIdIncrementer
をコメントアウトして
@Bean public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) { return jobBuilderFactory .get("loadFileToDatabaseJob") // .incrementer(new RunIdIncrementer()) .start(loadFileToDatabaseStep(null)) .next(deleteFileStep(null)) .build(); }
JsrJobParametersConverter
をBeanとして定義してもよいのではないのでしょうか。
@Bean public JsrJobParametersConverter jsrJobParametersConverter(DataSource dataSource) { return new JsrJobParametersConverter(dataSource); }
これは、うまくいきません。
この定義方法でJsrJobParametersConverter
が使われるのは、Spring BatchをJobLauncherApplicationRunner
を使って起動した場合ですね。
もしくは、JsrJobOperator
を使ってもよさそうではあります。
Spring Batch Integrationを介して実行できなさそうではありますが…。
まとめ
Spring BatchとSpring Integrationを組み合わせた、Spring Batch Integrationを試してみました。
ドキュメントが少ないのでSpring BatchとSpring Integrationのつなぎ込みはやや苦労しましたが、そこをクリアすればSpring Batchと
Spring Integrationそれぞれに知識があれば簡単に使える感じがしましたね。
Spring IntegrationのInbound Channel Adapterを介して取得するMessage
の単位がSpring BatchのJob
の起動単位になるので、
その粒度で大丈夫ならSpring Integrationの機能が使えるのでSpring Batchを使った表現範囲が大きくなるのはよくわかりました。
覚えておくとよさそうですね。