CLOVER🍀

That was when it all began.

Docker EngineLinuxでコンテナからホスト偎のサヌビスにアクセスするhost.docker.internal

これは、なにをしたくお曞いたもの

Docker Desktopには、host.docker.internalずいうホスト偎のIPアドレスを参照する仕組みがあるようです。

Networking features in Docker Desktop for Windows / Features / Use cases and workarounds / I want to connect from a container to a service on the host

Docker Desktopを䜿っおいないので党然知らなかったのですが、Docker Engine䜿っおいるのはLinux版でもひず手間加えるず䜿えるようなので、
ちょっず詊しおみるこずにしたした。

コンテナからホスト偎のを参照できるhost.docker.internal

Windows版のhost.docker.internalの説明を芋おみたす。

host.docker.internalはホストの内郚IPアドレスを解決する特別な名前です。Docker Desktop for Windows以倖の本番環境では機胜しないずも
曞かれおいたす。

We recommend that you connect to the special DNS name host.docker.internal which resolves to the internal IP address used by the host. This is for development purpose and will not work in a production environment outside of Docker Desktop for Windows.

Networking features in Docker Desktop for Windows / Features / Use cases and workarounds / I want to connect from a container to a service on the host

macOS版ずLinux版ではどうなのでしょう。曞かれおいるこずは、ほが同じです。

Networking features in Docker Desktop for Mac / Features / Use cases and workarounds / I want to connect from a container to a service on the host

Networking features in Docker Desktop for Mac / Features / Use cases and workarounds / I want to connect from a container to a service on the host

ずはいえ、Linux版のDocker Desktopは最近出おきたばかりのものですが。

The Magic of Docker Desktop is Now Available on Linux - Docker

[速報]「Docker Desktop for Linux」が登場、WindowsやMac版と同じ機能や操作を提供、Raspberry Pi OSにも対応 - Publickey

LinuxでDocker Engineしか䜿っおいない自分は、host.docker.internalの存圚をたったく知りたせんでした。

ずころで、リリヌスノヌトを芋おいるず、Docker Engineでも20.10.0からhost.docker.internalが䜿えるようになったみたいです。

Docker Engine release notes / 20.10.0

Support host.docker.internal in dockerd on Linux moby/moby#40007

Docker Engine release notes / 20.10.0 / Networking

Support host.docker.internal in dockerd on Linux by arkodg · Pull Request #40007 · moby/moby · GitHub

ずいうわけで、ちょっず詊しおみたす。

環境

今回の環境は、こちら。

$ docker version
Client: Docker Engine - Community
 Version:           20.10.16
 API version:       1.41
 Go version:        go1.17.10
 Git commit:        aa7e414
 Built:             Thu May 12 09:17:23 2022
 OS/Arch:           linux/amd64
 Context:           default
 Experimental:      true

Server: Docker Engine - Community
 Engine:
  Version:          20.10.16
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.17.10
  Git commit:       f756502
  Built:            Thu May 12 09:15:28 2022
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.6.4
  GitCommit:        212e8b6fa2f44b9c21b2798135fc6fb7c53efc16
 runc:
  Version:          1.1.1
  GitCommit:        v1.1.1-0-g52de29d
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0

Docker Composeも。

$ docker compose version
Docker Compose version v2.5.1

host.docker.internalを䜿っおみる

たずは、ホスト偎の準備をしたす。

ドキュメントに習っお、Python組み蟌みのWebサヌバヌを䜿いたしょう。

Networking features in Docker Desktop for Mac / Features / Use cases and workarounds / I want to connect from a container to a service on the host

ファむルの䜜成ずWebサヌバヌの起動。

$ echo 'from host' > message.txt
$ python3 -m http.server

確認。

$ curl localhost:8000/message.txt
from host

このWebサヌバヌに、コンテナ偎からアクセスしたい、ずいう話です。

コンテナを起動。

$ docker container run -it --rm ubuntu:20.04 bash
root@6b4790b61308:/# 

curlをむンストヌル。

# apt update && apt install -y curl

ホスト偎ぞアクセスを詊みたす。

# curl host.docker.internal:8000/message.txt
curl: (6) Could not resolve host: host.docker.internal

ホスト名が解決できない、ず蚀われたした。

/etc/hostsにも、確かに登録されおいたせんね。

# cat /etc/hosts
127.0.0.1       localhost
::1     localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
172.17.0.2      6b4790b61308

リリヌスノヌトに茉っおいたPull Requestを芋おみるず、--add-host=host.docker.internal:host-gatewayずいうオプションが必芁なようです。

Support host.docker.internal in dockerd on Linux by arkodg · Pull Request #40007 · moby/moby · GitHub

ずいうわけで、コンテナを1床終了しお

$ docker container run -it --rm --add-host=host.docker.internal:host-gateway ubuntu:20.04 bash
root@90f41cf6f930:/#

curlをむンストヌル。

# apt update && apt install -y curl

今床は、host.docker.internalでホスト偎にアクセスできるようになりたす。

# curl host.docker.internal:8000/message.txt
from host

/etc/hostsにも、host.docker.internalが増えおいたす。

# cat /etc/hosts
127.0.0.1       localhost
::1     localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
172.17.0.1      host.docker.internal
172.17.0.2      90f41cf6f930

確認できたしたね。

ホストのIPアドレスは、Docker Daemonに--host-gateway-ipオプションを䜿うこずで明瀺的な指定もできるようです。

Support host.docker.internal in dockerd on Linux by arkodg · Pull Request #40007 · moby/moby · GitHub

--add-host=host.docker.internal:host-gatewayをデフォルトで付䞎するように蚭定できないのかなずも思いたしたが、珟時点では
そのような話にはなっおいないみたいですね。

[RFD] add configuration option to add `host.docker.internal` by default · Issue #2290 · docker/cli · GitHub

Docker Composeでhost.docker.internalを䜿う

最埌に、host.docker.internalをDocker Composeでも䜿っおみたしょう。

Docker Composeの堎合は、extra_hostsを䜿いたす。

extra_hostsは、コンテナにホスト名のマッピングを远加する蚭定ですLinuxなら/etc/hostsを利甚。

extra_hosts adds hostname mappings to the container network interface configuration (/etc/hosts for Linux). Values MUST set hostname and IP address for additional hosts in the form of HOSTNAME:IP.

Compose specification / Services top-level element / extra_hosts

サンプルずしお、こんな定矩で䜜成。

compose.yaml

services:
  infinispan1:
    image: infinispan/server:13.0.10.Final
    extra_hosts:
      - "host.docker.internal:host-gateway"
  infinispan2:
    image: infinispan/server:13.0.10.Final
    extra_hosts:
      - "host.docker.internal:host-gateway"

起動しお

$ docker compose up

確認。

$ docker compose exec infinispan1 bash -c 'cat /etc/hosts | grep host.docker.internal'
172.17.0.1      host.docker.internal


$ docker compose exec infinispan2 bash -c 'cat /etc/hosts | grep host.docker.internal'
172.17.0.1      host.docker.internal

host.docker.internalを䜿った確認自䜓は、省略したす。

たた、compose.yamlからextra_hosts、host.docker.internal:host-gatewayの定矩を削陀するず、圓然ですがhost.docker.internalは
/etc/hostsに登録されなくなりたす。

これで、今回の確認はOKずしたしょう。

たずめ

host.docker.internalを䜿っお、コンテナからホスト偎のサヌビスにアクセスしおみたした。

䜿い方には泚意した方がいい気がしたすが、知っおおくずそれはそれで䟿利なのかな、ず。

Spring Batch × Spring IntegrationSpring Batch Integrationを詊す

これは、なにをしたくお曞いたもの

Spring Batchのドキュメントを芋おいお、Spring Integrationず組み合わせられそうだったので、ちょっず詊しおみようかなず。

Spring Batch Integration

Spring Batchず、Spring Integrationの組み合わせに぀いおは、こちらに蚘茉がありたす。

Spring Batch Integration

Spring Batch Integrationず呌ぶようです。

Spring Batchのナヌザヌ、Spring Integrationのナヌザヌずもに、双方を合わせお䜿うこずで芁件をより効率的に実珟できるケヌスに
遭遇する可胜性がある、ずいう話のようです。

Many users of Spring Batch may encounter requirements that are outside the scope of Spring Batch but that may be efficiently and concisely implemented by using Spring Integration. Conversely, Spring Integration users may encounter Spring Batch requirements and need a way to efficiently integrate both frameworks.

Spring Batch Integration / Spring Batch Integration Introduction

バッチプロセスにメッセヌゞングを远加するこずで、オペレヌションの自動化ず、キヌずなる懞念事項の分離が可胜になりたす。

Adding messaging to a batch process enables automation of operations and also separation and strategizing of key concerns.

ざっくり蚀うず、メッセヌゞングの仕組みからSpring BatchのJobを起動するためのもの、みたいです。
たた、メッセヌゞングの仕組みをゞョブに埋め蟌み、ワヌクロヌドを耇数のワヌカヌリモヌトパヌティショニング、リモヌトチャンクに
分散したりもできるようです。

メッセヌゞングを䜿ったSpring Batchの起動に぀いお。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages

Spring BatchをCommandLineJobRunnerを䜿っお起動したり、WebアプリケヌションにおいおJobOperatorを盎接扱う䜿い方も
ありたすが、より耇雑なナヌスケヌスに぀いお挙げおみたす。

バッチゞョブのデヌタを取埗するために、リモヌトのFTPやSFTPサヌバヌぞのポヌリングを行う必芁があったり、耇数の異なる
デヌタ゜ヌスをサポヌトする必芁があるかもしれたせん。

Maybe you need to poll a remote (S)FTP server to retrieve the data for the Batch Job or your application has to support multiple different data sources simultaneously.

Webだけでなく、FTPなどから゜ヌスずなるデヌタファむルを受け取るかもしれたせん。Spring Batchを呌び出す前に、入力ファむルの
倉換が必芁になる堎合もあるかもしれたせん。

For example, you may receive data files not only from the web, but also from FTP and other sources. Maybe additional transformation of the input files is needed before invoking Spring Batch.

このような堎合は、Spring Integrationずその倚数のアダプタヌを䜿っおバッチゞョブを実行するず、より匷力な仕組みになるでしょう。

Therefore, it would be much more powerful to execute the batch job using Spring Integration and its numerous adapters.

たずえば、File Inbound Channel Adapterを䜿甚しおファむルシステム内のディレクトリを監芖し、入力ファむルが到着したらすぐに
バッチゞョブを開始するこずができたす。さらに、耇数の異なるアダプタヌを䜿甚しおSpring Integrationのフロヌを䜜成し、
Configurationのみで耇数の゜ヌスから同時にバッチゞョブのデヌタを取り蟌むこずもできたす。

For example, you can use a File Inbound Channel Adapter to monitor a directory in the file-system and start the Batch Job as soon as the input file arrives. Additionally, you can create Spring Integration flows that use multiple different adapters to easily ingest data for your batch jobs from multiple sources simultaneously using only configuration.

この仕組みを実珟するために、Spring Batch Integrationでは以䞋を提䟛したす。

  • バッチゞョブの起動を行うJobLaunchingMessageHandler
  • JobLaunchingMessageHandlerの入力ずなるペむロヌドを持぀JobLaunchRequest

JobLaunchRequestは、バッチゞョブを起動するのに必芁なJobParametersずJobのラッパヌです。

あずは、Spring IntegrationのMessageからJobLaunchRequestぞ倉換する方法、

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Transforming a file into a JobLaunchRequest

JobExecutionに぀いおゞョブのリポゞトリを参照しおステヌタスを確認するこず、

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / The JobExecution Response

Spring Batch Integrationの蚭定䟋、

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Spring Batch Integration Configuration

ゞョブの䞭で䜿われるItemReaderの蚭定䟋ずいったものが続きたす。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Example ItemReader Configuration

ず、ドキュメントだけを眺めおいおもわからないので、実際に䜿っおいっおみたしょう。

お題

今回のお題は、このようにしたす。

  • 特定のディレクトリをポヌリングしお、CSVファむルの配眮を監芖
  • CSVファむルが配眮されたら、Spring Batchを起動
  • Spring Batchのゞョブは、CSVファむルを読み蟌むItemReaderず、読み蟌んだデヌタをデヌタベヌスに反映するItemWriterで構成
    • デヌタベヌスぞの反映にはJPAを䜿甚
  • 同じファむルを眮いおも、ゞョブを起動しお䞊曞き曎新する
    • 䞊曞きしたこずがわかるように、デヌタベヌス䞊のテヌブルには曎新時間を蚭ける

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-110-generic", arch: "amd64", family: "unix"

デヌタベヌスには、MySQLを䜿甚したす。MySQLは172.17.0.2で動䜜しおいるものずしたす。

$ mysql --version
mysql  Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)

プロゞェクトを䜜成する

たずは、Spring Bootプロゞェクトを䜜成したす。䟝存関係にはbatch、integration、data-jpa、mysqlを指定。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=batch-integration-example \
  -d groupId=org.littlewings \
  -d artifactId=batch-integration-example \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.batch.integration \
  -d dependencies=batch,integration,data-jpa,mysql \
  -d baseDir=batch-integration-example | tar zxvf -

プロゞェクト内ぞ移動。

$ cd batch-integration-example

Maven䟝存関係およびプラグむン蚭定。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-batch</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-data-jpa</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-integration</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-jpa</artifactId>
                </dependency>

                <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.batch</groupId>
                        <artifactId>spring-batch-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

生成された゜ヌスコヌドは削陀しおおきたす。

$ rm src/main/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplication.java src/test/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplicationTests.java

今回䜿いたい䟝存ラむブラリが足りないのず、䜿わないものもあるので少し倉曎。

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <!--
       <dependency>
           <groupId>org.springframework.integration</groupId>
           <artifactId>spring-integration-jpa</artifactId>
       </dependency>
       -->
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

たず、今回の䞻題であるspring-batch-integrationを远加、それからSpring Integrationを䜿ったファむル監芖のためにspring-integration-fileを远加。
そしお、spring-integration-jpaは䜿わないのでコメントアりトしおおきたす。

テヌブル定矩は、このようにしおおきたす。

src/main/resources/schema.sql

drop table if exists person;

create table person (
  id integer,
  last_name varchar(10),
  first_name varchar(10),
  age integer,
  updated_time datetime,
  primary key(id)
);

アプリケヌションの蚭定はこちら。

src/main/resources/application.properties

spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8
spring.datasource.username=kazuhira
spring.datasource.password=password

spring.sql.init.mode=always
spring.batch.jdbc.initialize-schema=always

spring.batch.job.enabled=false

schema.sqlは垞に適甚するようにしお、Spring Batchが䜿甚するテヌブルも䜜成するようにしたす。

もうひず぀ポむントがあるのですが、それはたた説明したす。

アプリケヌションを䜜成する

では、アプリケヌションを䜜成しおいきたしょう。

たずはmainクラスの䜜成。

src/main/java/org/littlewings/spring/batch/integration/App.java

package org.littlewings.spring.batch.integration;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

@EnableBatchProcessingアノテヌションは必芁です。Spring BatchのJobBuilderFactoryやStepBuilderFactoryなどが
AutoConfigurationされるようにする必芁があるので。

JPAの゚ンティティクラス。

src/main/java/org/littlewings/spring/batch/integration/Person.java

package org.littlewings.spring.batch.integration;

import java.time.LocalDateTime;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "person")
public class Person {
    @Id
    @Column(name = "id")
    Integer id;

    @Column(name = "last_name")
    String lastName;

    @Column(name = "first_name")
    String firstName;

    @Column(name = "age")
    Integer age;

    @Column(name = "updated_time")
    LocalDateTime updatedTime;

    // gettersetterは省略
}

DDLには存圚しなかったupdatedTimeずいうフィヌルドがありたすが、こちらはItemProcessorで蚭定するこずにしたす。

ここからは、こちらを芋ながらSpring IntegrationやSpring Batchの蚭定を行っおいきたす。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Spring Batch Integration Configuration

最初は@Configurationを付䞎したクラスの雛圢を䜜成。

src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java

package org.littlewings.spring.batch.integration;

import java.io.File;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
import javax.persistence.EntityManagerFactory;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.integration.launch.JobLaunchingGateway;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.handler.LoggingHandler;

@Configuration
public class IntegrationJobConfig {

    // 埌で
}

CSVファむルの監芖から曞いおいきたしょう。Spring IntegrationのFile Supportを䜿いたす。

File Support

    @Bean
    public IntegrationFlow integrationFlow() throws Exception {
        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )
                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();
    }

target/filesディレクトリ配䞋の.csv拡匵子のファむルを、1秒おきに監芖したす。たた、1回で取埗するファむルはひず぀にしおいたす。

        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )

ファむルを怜出したら、transformした埌に凊理を行いたすhandle。各ステップの間にはログ出力を行うようにしたしょう。
logメ゜ッドの第2匕数はロガヌ名です。

                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();

transformメ゜ッドに枡しおいるのは、Spring IntegrationのMessageをSpring BatchのJobに倉換するTransformerです。

ですが、その前にhandleメ゜ッドに枡しおいる凊理から芋おいきたしょう。

こちらでは、JobLaunchingGatewayを䜜成しおいたす。

    @Bean
    public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SyncTaskExecutor());
        jobLauncher.afterPropertiesSet();

        return new JobLaunchingGateway(jobLauncher);
    }

JobLaunchingGatewayは、Spring BatchのJobを実行するためのMessageHandlerむンタヌフェヌスの実装です。

JobLaunchingGateway (Spring Batch 4.3.5 API)

このため、JobLauncherのむンスタンスを必芁ずしたす。

Configuring and Running a Job / Configuring a JobLauncher

ここから先は、Spring BatchのJobを構成しおいきたす。

Jobの定矩。同じJobParametersの指定でも耇数回起動できるように、RunIdIncrementerを指定しおいたす。

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

Stepは2぀蚭けたした。

ひず぀目のStepは、ファむルを読み蟌んでデヌタベヌスに曞き蟌むものです。

    @Bean
    public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("loadFileToDatabaseStep")
                .<Person, Person>chunk(3)
                .reader(flatFilePersonItemReader(null))
                .processor(updateTimeProcessor())
                .writer(jpaPersonItemWriter(null))
                .build();
    }

ItemReader、ItemProcessor、ItemWriterの定矩はこちら。
CSVファむルを読み蟌む → ゚ンティティにupdatedTimeを蚭定する → JPAでデヌタベヌスに曞き蟌む、ずいう流れになっおいたす。

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
        Resource resource = new PathResource(path);

        return new FlatFileItemReaderBuilder<Person>()
                .name("flatFilePersonItemReader")
                .resource(resource)
                .encoding("UTF-8")
                .delimited()
                .names(new String[]{"id", "lastName", "firstName", "age"})
                .linesToSkip(1)
                .targetType(Person.class)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Person, Person> updateTimeProcessor() {
        return (person) -> {
            person.setUpdatedTime(LocalDateTime.now());
            return person;
        };
    }

    @Bean
    @StepScope
    public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<Person>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

入力ずなるCSVファむルのパスは、JobParametersずしお取埗したす。

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {

最埌は、読み蟌んだファむルを削陀するStepです。ファむルを削陀する凊理を入れないず、監芖察象のディレクトリにファむルが
残り続けるので、デヌタの取り蟌み凊理を延々ず繰り返しおしたいたす。

    @Bean
    public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("deleteFileStep")
                .tasklet(deleteFileTasklet(null))
                .build();
    }

ファむルの削陀を行うTasklet。このようなStepではなく違う方法でもファむルの削陀はできるず思うのですが、どこかしらに削陀の
凊理を曞かないずいけなさそうなこずは倉わらないので、今回はこの方法を遞択したした。

    @Bean
    @StepScope
    public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) {
        return (contribution, context) -> {
            java.nio.file.Files.delete(Paths.get(path));
            return RepeatStatus.FINISHED;
        };
    }

そしお、このSpring Batchで定矩したJobず

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

Jobを実行するためにSpring IntegrationのMessageをJobに倉換するのが、Transformerでした。

    public IntegrationFlow integrationFlow() throws Exception {
        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )
                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();

その凊理の内容は、こちら。

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
        fileMessageToJobRequest.setJobExplorer(jobExplorer);

        return fileMessageToJobRequest;
    }

参考にしおいるドキュメントは、こちらですね。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / The JobExecution Response

JobおよびJobExplorerを蚭定しおいたす。

        fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
        fileMessageToJobRequest.setJobExplorer(jobExplorer);

クラスの定矩は、こちら。

src/main/java/org/littlewings/spring/batch/integration/FileMessageToJobRequest.java

package org.littlewings.spring.batch.integration;

import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

public class FileMessageToJobRequest {
    Job job;
    JobExplorer jobExplorer;

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer);  // for JobParametersIncrementer

        jobParametersBuilder.getNextJobParameters(job)  // for JobParametersIncrementer
                .addString("input.file.path", message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

    public Job getJob() {
        return job;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    public JobExplorer getJobExplorer() {
        return jobExplorer;
    }

    public void setJobExplorer(JobExplorer jobExplorer) {
        this.jobExplorer = jobExplorer;
    }
}

ポむントは、こちらのメ゜ッドです。@Transformerアノテヌションを付䞎しお、Spring IntegrationのFile Supportを䜿っお怜出した
CSVファむルMessage<File>をJobLaunchRequestに倉換したす。

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer);  // for JobParametersIncrementer

        jobParametersBuilder.getNextJobParameters(job)  // for JobParametersIncrementer
                .addString("input.file.path", message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

CSVファむルのパスは、この郚分でJobParametersずしお蚭定しおいたす。

                .addString("input.file.path", message.getPayload().getAbsolutePath());

ここでの指定が、こちらに枡されるわけですね。

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {

ずころでサンプルコヌドを芋るず、JobExplorerは指定しおいないのですが。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / The JobExecution Response

今回JobParametersIncrementerRunIdIncrementerを䜿ったのでJobParametersBuilderにJobExplorerの蚭定および、
JobParametersBuilder#getNextJobParameters`の呌び出しが必芁になりたす。

もし、RunIdIncrementerなしでできる限り同じJobParametersで実行しようずするず、以䞋のようにJobの実行の床に異なる
JobParamtersを蚭定するようなこずをする必芁が出おくるでしょうね。

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder
                .addString("input.file.path", message.getPayload().getAbsolutePath())
                .addString("job.start.time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss")));

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

これで、Spring BatchおよびSpring Integrationの蚭定はできたした。

Spring BatchのJobおよびSpring IntegrationのIntegrationFlowを定矩しおいるクラス党䜓を茉せるず、こんな感じになっおいたす。

src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java

package org.littlewings.spring.batch.integration;

import java.io.File;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
import javax.persistence.EntityManagerFactory;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.integration.launch.JobLaunchingGateway;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.handler.LoggingHandler;

@Configuration
public class IntegrationJobConfig {
    @Bean
    public IntegrationFlow integrationFlow() throws Exception {
        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )
                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
        fileMessageToJobRequest.setJobExplorer(jobExplorer);

        return fileMessageToJobRequest;
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SyncTaskExecutor());
        jobLauncher.afterPropertiesSet();

        return new JobLaunchingGateway(jobLauncher);
    }

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

    @Bean
    public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("loadFileToDatabaseStep")
                .<Person, Person>chunk(3)
                .reader(flatFilePersonItemReader(null))
                .processor(updateTimeProcessor())
                .writer(jpaPersonItemWriter(null))
                .build();
    }

    @Bean
    public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("deleteFileStep")
                .tasklet(deleteFileTasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
        Resource resource = new PathResource(path);

        return new FlatFileItemReaderBuilder<Person>()
                .name("flatFilePersonItemReader")
                .resource(resource)
                .encoding("UTF-8")
                .delimited()
                .names(new String[]{"id", "lastName", "firstName", "age"})
                .linesToSkip(1)
                .targetType(Person.class)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Person, Person> updateTimeProcessor() {
        return (person) -> {
            person.setUpdatedTime(LocalDateTime.now());
            return person;
        };
    }

    @Bean
    @StepScope
    public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<Person>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

    @Bean
    @StepScope
    public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) {
        return (contribution, context) -> {
            java.nio.file.Files.delete(Paths.get(path));
            return RepeatStatus.FINISHED;
        };
    }
}

動䜜確認しおみる

では、䜜成したアプリケヌションを動かしおみたしょう。

読み蟌むCSVファむルを、2぀甚意したす。

src/main/resources/isono_family.csv

id,last_name,first_name,age
1,磯野,カツオ,11
2,磯野,ワカメ,9
3,フグ田,サザ゚,23
4,フグ田,マスオ,32
5,フグ田,タラオ,3

src/main/resources/namino_family.csv

id,last_name,first_name,age
6,波野,ノリスケ,24
7,波野,タむコ,22
8,波野,むクラ,1

なんずなくsrc/main/resourcesに眮いおいるのでクラスパス䞊に存圚しおいたすが、クラスパスを監芖したいわけではありたせん。

パッケヌゞングしお

$ mvn package

起動。

$ java -jar target/batch-integration-example-0.0.1-SNAPSHOT.jar

Webアプリケヌションではないのですが、このたたサヌバヌずしお起動しおくれおいたす。

この時、src/main/resources/application.propertiesに以䞋の蚭定を入れおいたした。

spring.batch.job.enabled=false

これを入れおいない堎合は、Spring Batchのデフォルトの動䜜ずしおすべおのJobをしようずするので泚意が必芁です。 今回はSpring Batch Integrationを䜿ったJobしかなく、か぀ディレクトリ監芖でJobを起動したいのでspring.batch.job.enabledをfalseずしお
アプリケヌション起動時にJobを実行しないようにしたした。

なお、ディレクトリ監芖を行うず、監芖察象のディレクトリがない堎合は䜜成しようずするみたいですね。

$ ll target/files
合蚈 8
drwxrwxr-x 2 xxxxx xxxxx 4096  5月 20 01:19 ./
drwxrwxr-x 9 xxxxx xxxxx 4096  5月 20 01:19 ../

最初にテヌブルの状態を確認。

mysql> select * from person;
Empty set (0.00 sec)

1ファむル目を監芖察象のディレクトリにコピヌしおみたす。

$ cp src/main/resources/isono_family.csv target/files

するず、アプリケヌションが動き始めたす。

2022-05-20 01:22:30.249  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}]
2022-05-20 01:22:30.249  INFO 18890 --- [   scheduling-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2022-05-20 01:22:30.307  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}]
2022-05-20 01:22:30.509  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}]
2022-05-20 01:22:30.668  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:22:30.975  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 306ms
2022-05-20 01:22:31.061  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:22:31.169  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 108ms
2022-05-20 01:22:31.224  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms
2022-05-20 01:22:31.231  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]

ログを芋るず、ファむルを怜出しお

2022-05-20 01:22:30.249  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}]

Jobに倉換。

2022-05-20 01:22:30.307  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}]

Spring BatchのJobを実行。

2022-05-20 01:22:30.668  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:22:30.975  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 306ms
2022-05-20 01:22:31.061  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:22:31.169  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 108ms
2022-05-20 01:22:31.224  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms

終了、ずいう様子がわかりたす。

2022-05-20 01:22:31.231  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]

デヌタを確認しおみたしょう。

mysql> select * from person;
+----+-----------+------------+------+---------------------+
| id | last_name | first_name | age  | updated_time        |
+----+-----------+------------+------+---------------------+
|  1 | 磯野      | カツオ     |   11 | 2022-05-20 01:22:31 |
|  2 | 磯野      | ワカメ     |    9 | 2022-05-20 01:22:31 |
|  3 | フグ田    | サザ゚     |   23 | 2022-05-20 01:22:31 |
|  4 | フグ田    | マスオ     |   32 | 2022-05-20 01:22:31 |
|  5 | フグ田    | タラオ     |    3 | 2022-05-20 01:22:31 |
+----+-----------+------------+------+---------------------+
5 rows in set (0.00 sec)

ちゃんず取り蟌たれおいたすね。

監芖察象のディレクトリに配眮したファむルは、Spring BatchのJobに含たれおいたTaskletにより削陀されおいたす。

$ tree target/files
target/files

0 directories, 0 files

もうひず぀のファむルを、監芖察象のディレクトリにコピヌしおみたす。

$ cp src/main/resources/namino_family.csv target/files

先ほどず同様、配眮されたファむルを怜出しおSpring Batchが起動したした。

2022-05-20 01:31:28.237  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/namino_family.csv, headers={file_originalFile=target/files/namino_family.csv, id=2e9cac33-a5be-8eeb-a471-d47f579906cc, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888237}]
2022-05-20 01:31:28.278  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}, headers={file_originalFile=target/files/namino_family.csv, id=c404ecd5-4b5a-2593-dc12-13b0621b3b54, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888278}]
2022-05-20 01:31:28.365  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}]
2022-05-20 01:31:28.448  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:31:28.581  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 132ms
2022-05-20 01:31:28.658  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:31:28.834  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 176ms
2022-05-20 01:31:28.939  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}] and the following status: [COMPLETED] in 538ms
2022-05-20 01:31:28.940  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=2, version=2, startTime=Fri May 20 01:31:28 JST 2022, endTime=Fri May 20 01:31:28 JST 2022, lastUpdated=Fri May 20 01:31:28 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=2, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}], headers={file_originalFile=target/files/namino_family.csv, id=f50295b0-dd2b-5d49-6196-d61c5507b3d5, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888940}]

取り蟌たれたデヌタ。

mysql> select * from person;
+----+-----------+--------------+------+---------------------+
| id | last_name | first_name   | age  | updated_time        |
+----+-----------+--------------+------+---------------------+
|  1 | 磯野      | カツオ       |   11 | 2022-05-20 01:22:31 |
|  2 | 磯野      | ワカメ       |    9 | 2022-05-20 01:22:31 |
|  3 | フグ田    | サザ゚       |   23 | 2022-05-20 01:22:31 |
|  4 | フグ田    | マスオ       |   32 | 2022-05-20 01:22:31 |
|  5 | フグ田    | タラオ       |    3 | 2022-05-20 01:22:31 |
|  6 | 波野      | ノリスケ     |   24 | 2022-05-20 01:31:29 |
|  7 | 波野      | タむコ       |   22 | 2022-05-20 01:31:29 |
|  8 | 波野      | むクラ       |    1 | 2022-05-20 01:31:29 |
+----+-----------+--------------+------+---------------------+
8 rows in set (0.00 sec)

ここで、もう1床最初に取り蟌んだファむルをコピヌしおみたす。

$ cp src/main/resources/isono_family.csv target/files

最初ず同じように、Spring Batchが起動したす。

2022-05-20 01:33:40.237  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ef090587-8d4d-0932-7b89-62d2418d8391, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020237}]
2022-05-20 01:33:40.252  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}]
2022-05-20 01:33:40.320  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}]
2022-05-20 01:33:40.397  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:33:40.504  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 107ms
2022-05-20 01:33:40.571  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:33:40.631  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 60ms
2022-05-20 01:33:40.691  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 348ms
2022-05-20 01:33:40.692  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=3, version=2, startTime=Fri May 20 01:33:40 JST 2022, endTime=Fri May 20 01:33:40 JST 2022, lastUpdated=Fri May 20 01:33:40 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=3, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=23cb8db4-88cd-5bfb-6557-aca9275a8b1f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020691}]

Jobの構成にRunIdIncrementerを含めおいるので、run.idがむンクリメントされおいるので䞎えるファむルパスが同じでも問題なく
起動したす。

2022-05-20 01:33:40.252  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}]

RunIdIncrementerを含めずに同じJobParametersでJobを起動した堎合は、Spring Batchの実行に倱敗するこずになりたす。

同じデヌタのupdated_timeが曎新されおいるこずも確認しおおきたす。

mysql> select * from person;
+----+-----------+--------------+------+---------------------+
| id | last_name | first_name   | age  | updated_time        |
+----+-----------+--------------+------+---------------------+
|  1 | 磯野      | カツオ       |   11 | 2022-05-20 01:33:40 |
|  2 | 磯野      | ワカメ       |    9 | 2022-05-20 01:33:40 |
|  3 | フグ田    | サザ゚       |   23 | 2022-05-20 01:33:40 |
|  4 | フグ田    | マスオ       |   32 | 2022-05-20 01:33:40 |
|  5 | フグ田    | タラオ       |    3 | 2022-05-20 01:33:40 |
|  6 | 波野      | ノリスケ     |   24 | 2022-05-20 01:31:29 |
|  7 | 波野      | タむコ       |   22 | 2022-05-20 01:31:29 |
|  8 | 波野      | むクラ       |    1 | 2022-05-20 01:31:29 |
+----+-----------+--------------+------+---------------------+
8 rows in set (0.00 sec)

OKですね。

これで、Spring BatchずSpring Integrationの組み合わせで動䜜確認できたした。

オマケRunIdIncrementerの代わりにJsrJobParametersConverterを䜿った堎合は

今回、RunIdIncrementerを䜿甚したしたがJsrJobParametersConverterを䜿っおもよいのではずいう疑問を持ったりはしたす。

RunIdIncrementerをコメントアりトしお

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                // .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

JsrJobParametersConverterをBeanずしお定矩しおもよいのではないのでしょうか。

    @Bean
    public JsrJobParametersConverter jsrJobParametersConverter(DataSource dataSource) {
        return new JsrJobParametersConverter(dataSource);
    }

これは、うたくいきたせん。

この定矩方法でJsrJobParametersConverterが䜿われるのは、Spring BatchをJobLauncherApplicationRunnerを䜿っお起動した堎合ですね。

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java

もしくは、JsrJobOperatorを䜿っおもよさそうではありたす。

https://github.com/spring-projects/spring-batch/blob/4.3.5/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/launch/JsrJobOperator.java

Spring Batch Integrationを介しお実行できなさそうではありたすが 。

たずめ

Spring BatchずSpring Integrationを組み合わせた、Spring Batch Integrationを詊しおみたした。

ドキュメントが少ないのでSpring BatchずSpring Integrationの぀なぎ蟌みはやや苊劎したしたが、そこをクリアすればSpring Batchず
Spring Integrationそれぞれに知識があれば簡単に䜿える感じがしたしたね。

Spring IntegrationのInbound Channel Adapterを介しお取埗するMessageの単䜍がSpring BatchのJobの起動単䜍になるので、
その粒床で倧䞈倫ならSpring Integrationの機胜が䜿えるのでSpring Batchを䜿った衚珟範囲が倧きくなるのはよくわかりたした。

芚えおおくずよさそうですね。