CLOVER🍀

That was when it all began.

Spring Batch × Spring IntegrationSpring Batch Integrationを詊す

これは、なにをしたくお曞いたもの

Spring Batchのドキュメントを芋おいお、Spring Integrationず組み合わせられそうだったので、ちょっず詊しおみようかなず。

Spring Batch Integration

Spring Batchず、Spring Integrationの組み合わせに぀いおは、こちらに蚘茉がありたす。

Spring Batch Integration

Spring Batch Integrationず呌ぶようです。

Spring Batchのナヌザヌ、Spring Integrationのナヌザヌずもに、双方を合わせお䜿うこずで芁件をより効率的に実珟できるケヌスに
遭遇する可胜性がある、ずいう話のようです。

Many users of Spring Batch may encounter requirements that are outside the scope of Spring Batch but that may be efficiently and concisely implemented by using Spring Integration. Conversely, Spring Integration users may encounter Spring Batch requirements and need a way to efficiently integrate both frameworks.

Spring Batch Integration / Spring Batch Integration Introduction

バッチプロセスにメッセヌゞングを远加するこずで、オペレヌションの自動化ず、キヌずなる懞念事項の分離が可胜になりたす。

Adding messaging to a batch process enables automation of operations and also separation and strategizing of key concerns.

ざっくり蚀うず、メッセヌゞングの仕組みからSpring BatchのJobを起動するためのもの、みたいです。
たた、メッセヌゞングの仕組みをゞョブに埋め蟌み、ワヌクロヌドを耇数のワヌカヌリモヌトパヌティショニング、リモヌトチャンクに
分散したりもできるようです。

メッセヌゞングを䜿ったSpring Batchの起動に぀いお。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages

Spring BatchをCommandLineJobRunnerを䜿っお起動したり、WebアプリケヌションにおいおJobOperatorを盎接扱う䜿い方も
ありたすが、より耇雑なナヌスケヌスに぀いお挙げおみたす。

バッチゞョブのデヌタを取埗するために、リモヌトのFTPやSFTPサヌバヌぞのポヌリングを行う必芁があったり、耇数の異なる
デヌタ゜ヌスをサポヌトする必芁があるかもしれたせん。

Maybe you need to poll a remote (S)FTP server to retrieve the data for the Batch Job or your application has to support multiple different data sources simultaneously.

Webだけでなく、FTPなどから゜ヌスずなるデヌタファむルを受け取るかもしれたせん。Spring Batchを呌び出す前に、入力ファむルの
倉換が必芁になる堎合もあるかもしれたせん。

For example, you may receive data files not only from the web, but also from FTP and other sources. Maybe additional transformation of the input files is needed before invoking Spring Batch.

このような堎合は、Spring Integrationずその倚数のアダプタヌを䜿っおバッチゞョブを実行するず、より匷力な仕組みになるでしょう。

Therefore, it would be much more powerful to execute the batch job using Spring Integration and its numerous adapters.

たずえば、File Inbound Channel Adapterを䜿甚しおファむルシステム内のディレクトリを監芖し、入力ファむルが到着したらすぐに
バッチゞョブを開始するこずができたす。さらに、耇数の異なるアダプタヌを䜿甚しおSpring Integrationのフロヌを䜜成し、
Configurationのみで耇数の゜ヌスから同時にバッチゞョブのデヌタを取り蟌むこずもできたす。

For example, you can use a File Inbound Channel Adapter to monitor a directory in the file-system and start the Batch Job as soon as the input file arrives. Additionally, you can create Spring Integration flows that use multiple different adapters to easily ingest data for your batch jobs from multiple sources simultaneously using only configuration.

この仕組みを実珟するために、Spring Batch Integrationでは以䞋を提䟛したす。

  • バッチゞョブの起動を行うJobLaunchingMessageHandler
  • JobLaunchingMessageHandlerの入力ずなるペむロヌドを持぀JobLaunchRequest

JobLaunchRequestは、バッチゞョブを起動するのに必芁なJobParametersずJobのラッパヌです。

あずは、Spring IntegrationのMessageからJobLaunchRequestぞ倉換する方法、

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Transforming a file into a JobLaunchRequest

JobExecutionに぀いおゞョブのリポゞトリを参照しおステヌタスを確認するこず、

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / The JobExecution Response

Spring Batch Integrationの蚭定䟋、

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Spring Batch Integration Configuration

ゞョブの䞭で䜿われるItemReaderの蚭定䟋ずいったものが続きたす。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Example ItemReader Configuration

ず、ドキュメントだけを眺めおいおもわからないので、実際に䜿っおいっおみたしょう。

お題

今回のお題は、このようにしたす。

  • 特定のディレクトリをポヌリングしお、CSVファむルの配眮を監芖
  • CSVファむルが配眮されたら、Spring Batchを起動
  • Spring Batchのゞョブは、CSVファむルを読み蟌むItemReaderず、読み蟌んだデヌタをデヌタベヌスに反映するItemWriterで構成
    • デヌタベヌスぞの反映にはJPAを䜿甚
  • 同じファむルを眮いおも、ゞョブを起動しお䞊曞き曎新する
    • 䞊曞きしたこずがわかるように、デヌタベヌス䞊のテヌブルには曎新時間を蚭ける

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-110-generic", arch: "amd64", family: "unix"

デヌタベヌスには、MySQLを䜿甚したす。MySQLは172.17.0.2で動䜜しおいるものずしたす。

$ mysql --version
mysql  Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)

プロゞェクトを䜜成する

たずは、Spring Bootプロゞェクトを䜜成したす。䟝存関係にはbatch、integration、data-jpa、mysqlを指定。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=batch-integration-example \
  -d groupId=org.littlewings \
  -d artifactId=batch-integration-example \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.batch.integration \
  -d dependencies=batch,integration,data-jpa,mysql \
  -d baseDir=batch-integration-example | tar zxvf -

プロゞェクト内ぞ移動。

$ cd batch-integration-example

Maven䟝存関係およびプラグむン蚭定。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-batch</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-data-jpa</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-integration</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-jpa</artifactId>
                </dependency>

                <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.batch</groupId>
                        <artifactId>spring-batch-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

生成された゜ヌスコヌドは削陀しおおきたす。

$ rm src/main/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplication.java src/test/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplicationTests.java

今回䜿いたい䟝存ラむブラリが足りないのず、䜿わないものもあるので少し倉曎。

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <!--
       <dependency>
           <groupId>org.springframework.integration</groupId>
           <artifactId>spring-integration-jpa</artifactId>
       </dependency>
       -->
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

たず、今回の䞻題であるspring-batch-integrationを远加、それからSpring Integrationを䜿ったファむル監芖のためにspring-integration-fileを远加。
そしお、spring-integration-jpaは䜿わないのでコメントアりトしおおきたす。

テヌブル定矩は、このようにしおおきたす。

src/main/resources/schema.sql

drop table if exists person;

create table person (
  id integer,
  last_name varchar(10),
  first_name varchar(10),
  age integer,
  updated_time datetime,
  primary key(id)
);

アプリケヌションの蚭定はこちら。

src/main/resources/application.properties

spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8
spring.datasource.username=kazuhira
spring.datasource.password=password

spring.sql.init.mode=always
spring.batch.jdbc.initialize-schema=always

spring.batch.job.enabled=false

schema.sqlは垞に適甚するようにしお、Spring Batchが䜿甚するテヌブルも䜜成するようにしたす。

もうひず぀ポむントがあるのですが、それはたた説明したす。

アプリケヌションを䜜成する

では、アプリケヌションを䜜成しおいきたしょう。

たずはmainクラスの䜜成。

src/main/java/org/littlewings/spring/batch/integration/App.java

package org.littlewings.spring.batch.integration;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

@EnableBatchProcessingアノテヌションは必芁です。Spring BatchのJobBuilderFactoryやStepBuilderFactoryなどが
AutoConfigurationされるようにする必芁があるので。

JPAの゚ンティティクラス。

src/main/java/org/littlewings/spring/batch/integration/Person.java

package org.littlewings.spring.batch.integration;

import java.time.LocalDateTime;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "person")
public class Person {
    @Id
    @Column(name = "id")
    Integer id;

    @Column(name = "last_name")
    String lastName;

    @Column(name = "first_name")
    String firstName;

    @Column(name = "age")
    Integer age;

    @Column(name = "updated_time")
    LocalDateTime updatedTime;

    // gettersetterは省略
}

DDLには存圚しなかったupdatedTimeずいうフィヌルドがありたすが、こちらはItemProcessorで蚭定するこずにしたす。

ここからは、こちらを芋ながらSpring IntegrationやSpring Batchの蚭定を行っおいきたす。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / Spring Batch Integration Configuration

最初は@Configurationを付䞎したクラスの雛圢を䜜成。

src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java

package org.littlewings.spring.batch.integration;

import java.io.File;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
import javax.persistence.EntityManagerFactory;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.integration.launch.JobLaunchingGateway;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.handler.LoggingHandler;

@Configuration
public class IntegrationJobConfig {

    // 埌で
}

CSVファむルの監芖から曞いおいきたしょう。Spring IntegrationのFile Supportを䜿いたす。

File Support

    @Bean
    public IntegrationFlow integrationFlow() throws Exception {
        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )
                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();
    }

target/filesディレクトリ配䞋の.csv拡匵子のファむルを、1秒おきに監芖したす。たた、1回で取埗するファむルはひず぀にしおいたす。

        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )

ファむルを怜出したら、transformした埌に凊理を行いたすhandle。各ステップの間にはログ出力を行うようにしたしょう。
logメ゜ッドの第2匕数はロガヌ名です。

                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();

transformメ゜ッドに枡しおいるのは、Spring IntegrationのMessageをSpring BatchのJobに倉換するTransformerです。

ですが、その前にhandleメ゜ッドに枡しおいる凊理から芋おいきたしょう。

こちらでは、JobLaunchingGatewayを䜜成しおいたす。

    @Bean
    public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SyncTaskExecutor());
        jobLauncher.afterPropertiesSet();

        return new JobLaunchingGateway(jobLauncher);
    }

JobLaunchingGatewayは、Spring BatchのJobを実行するためのMessageHandlerむンタヌフェヌスの実装です。

JobLaunchingGateway (Spring Batch 4.3.5 API)

このため、JobLauncherのむンスタンスを必芁ずしたす。

Configuring and Running a Job / Configuring a JobLauncher

ここから先は、Spring BatchのJobを構成しおいきたす。

Jobの定矩。同じJobParametersの指定でも耇数回起動できるように、RunIdIncrementerを指定しおいたす。

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

Stepは2぀蚭けたした。

ひず぀目のStepは、ファむルを読み蟌んでデヌタベヌスに曞き蟌むものです。

    @Bean
    public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("loadFileToDatabaseStep")
                .<Person, Person>chunk(3)
                .reader(flatFilePersonItemReader(null))
                .processor(updateTimeProcessor())
                .writer(jpaPersonItemWriter(null))
                .build();
    }

ItemReader、ItemProcessor、ItemWriterの定矩はこちら。
CSVファむルを読み蟌む → ゚ンティティにupdatedTimeを蚭定する → JPAでデヌタベヌスに曞き蟌む、ずいう流れになっおいたす。

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
        Resource resource = new PathResource(path);

        return new FlatFileItemReaderBuilder<Person>()
                .name("flatFilePersonItemReader")
                .resource(resource)
                .encoding("UTF-8")
                .delimited()
                .names(new String[]{"id", "lastName", "firstName", "age"})
                .linesToSkip(1)
                .targetType(Person.class)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Person, Person> updateTimeProcessor() {
        return (person) -> {
            person.setUpdatedTime(LocalDateTime.now());
            return person;
        };
    }

    @Bean
    @StepScope
    public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<Person>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

入力ずなるCSVファむルのパスは、JobParametersずしお取埗したす。

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {

最埌は、読み蟌んだファむルを削陀するStepです。ファむルを削陀する凊理を入れないず、監芖察象のディレクトリにファむルが
残り続けるので、デヌタの取り蟌み凊理を延々ず繰り返しおしたいたす。

    @Bean
    public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("deleteFileStep")
                .tasklet(deleteFileTasklet(null))
                .build();
    }

ファむルの削陀を行うTasklet。このようなStepではなく違う方法でもファむルの削陀はできるず思うのですが、どこかしらに削陀の
凊理を曞かないずいけなさそうなこずは倉わらないので、今回はこの方法を遞択したした。

    @Bean
    @StepScope
    public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) {
        return (contribution, context) -> {
            java.nio.file.Files.delete(Paths.get(path));
            return RepeatStatus.FINISHED;
        };
    }

そしお、このSpring Batchで定矩したJobず

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

Jobを実行するためにSpring IntegrationのMessageをJobに倉換するのが、Transformerでした。

    public IntegrationFlow integrationFlow() throws Exception {
        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )
                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();

その凊理の内容は、こちら。

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
        fileMessageToJobRequest.setJobExplorer(jobExplorer);

        return fileMessageToJobRequest;
    }

参考にしおいるドキュメントは、こちらですね。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / The JobExecution Response

JobおよびJobExplorerを蚭定しおいたす。

        fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
        fileMessageToJobRequest.setJobExplorer(jobExplorer);

クラスの定矩は、こちら。

src/main/java/org/littlewings/spring/batch/integration/FileMessageToJobRequest.java

package org.littlewings.spring.batch.integration;

import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

public class FileMessageToJobRequest {
    Job job;
    JobExplorer jobExplorer;

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer);  // for JobParametersIncrementer

        jobParametersBuilder.getNextJobParameters(job)  // for JobParametersIncrementer
                .addString("input.file.path", message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

    public Job getJob() {
        return job;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    public JobExplorer getJobExplorer() {
        return jobExplorer;
    }

    public void setJobExplorer(JobExplorer jobExplorer) {
        this.jobExplorer = jobExplorer;
    }
}

ポむントは、こちらのメ゜ッドです。@Transformerアノテヌションを付䞎しお、Spring IntegrationのFile Supportを䜿っお怜出した
CSVファむルMessage<File>をJobLaunchRequestに倉換したす。

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer);  // for JobParametersIncrementer

        jobParametersBuilder.getNextJobParameters(job)  // for JobParametersIncrementer
                .addString("input.file.path", message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

CSVファむルのパスは、この郚分でJobParametersずしお蚭定しおいたす。

                .addString("input.file.path", message.getPayload().getAbsolutePath());

ここでの指定が、こちらに枡されるわけですね。

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {

ずころでサンプルコヌドを芋るず、JobExplorerは指定しおいないのですが。

Spring Batch Integration / Spring Batch Integration Introduction / Launching Batch Jobs through Messages / The JobExecution Response

今回JobParametersIncrementerRunIdIncrementerを䜿ったのでJobParametersBuilderにJobExplorerの蚭定および、
JobParametersBuilder#getNextJobParameters`の呌び出しが必芁になりたす。

もし、RunIdIncrementerなしでできる限り同じJobParametersで実行しようずするず、以䞋のようにJobの実行の床に異なる
JobParamtersを蚭定するようなこずをする必芁が出おくるでしょうね。

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder
                .addString("input.file.path", message.getPayload().getAbsolutePath())
                .addString("job.start.time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss")));

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

これで、Spring BatchおよびSpring Integrationの蚭定はできたした。

Spring BatchのJobおよびSpring IntegrationのIntegrationFlowを定矩しおいるクラス党䜓を茉せるず、こんな感じになっおいたす。

src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java

package org.littlewings.spring.batch.integration;

import java.io.File;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
import javax.persistence.EntityManagerFactory;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.integration.launch.JobLaunchingGateway;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.handler.LoggingHandler;

@Configuration
public class IntegrationJobConfig {
    @Bean
    public IntegrationFlow integrationFlow() throws Exception {
        return IntegrationFlows
                .from(
                        Files
                                .inboundAdapter(new File("target/files"))
                                .filter(new SimplePatternFileListFilter("*.csv")),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L))
                )
                .log(LoggingHandler.Level.INFO, "polling file")
                .transform(fileMessageToJobRequest(null))
                .log(LoggingHandler.Level.INFO, "transform job request")
                .handle(jobLaunchingGateway(null))
                .log(LoggingHandler.Level.INFO, "job execution result")
                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
        fileMessageToJobRequest.setJobExplorer(jobExplorer);

        return fileMessageToJobRequest;
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SyncTaskExecutor());
        jobLauncher.afterPropertiesSet();

        return new JobLaunchingGateway(jobLauncher);
    }

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

    @Bean
    public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("loadFileToDatabaseStep")
                .<Person, Person>chunk(3)
                .reader(flatFilePersonItemReader(null))
                .processor(updateTimeProcessor())
                .writer(jpaPersonItemWriter(null))
                .build();
    }

    @Bean
    public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get("deleteFileStep")
                .tasklet(deleteFileTasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
        Resource resource = new PathResource(path);

        return new FlatFileItemReaderBuilder<Person>()
                .name("flatFilePersonItemReader")
                .resource(resource)
                .encoding("UTF-8")
                .delimited()
                .names(new String[]{"id", "lastName", "firstName", "age"})
                .linesToSkip(1)
                .targetType(Person.class)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Person, Person> updateTimeProcessor() {
        return (person) -> {
            person.setUpdatedTime(LocalDateTime.now());
            return person;
        };
    }

    @Bean
    @StepScope
    public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<Person>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

    @Bean
    @StepScope
    public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) {
        return (contribution, context) -> {
            java.nio.file.Files.delete(Paths.get(path));
            return RepeatStatus.FINISHED;
        };
    }
}

動䜜確認しおみる

では、䜜成したアプリケヌションを動かしおみたしょう。

読み蟌むCSVファむルを、2぀甚意したす。

src/main/resources/isono_family.csv

id,last_name,first_name,age
1,磯野,カツオ,11
2,磯野,ワカメ,9
3,フグ田,サザ゚,23
4,フグ田,マスオ,32
5,フグ田,タラオ,3

src/main/resources/namino_family.csv

id,last_name,first_name,age
6,波野,ノリスケ,24
7,波野,タむコ,22
8,波野,むクラ,1

なんずなくsrc/main/resourcesに眮いおいるのでクラスパス䞊に存圚しおいたすが、クラスパスを監芖したいわけではありたせん。

パッケヌゞングしお

$ mvn package

起動。

$ java -jar target/batch-integration-example-0.0.1-SNAPSHOT.jar

Webアプリケヌションではないのですが、このたたサヌバヌずしお起動しおくれおいたす。

この時、src/main/resources/application.propertiesに以䞋の蚭定を入れおいたした。

spring.batch.job.enabled=false

これを入れおいない堎合は、Spring Batchのデフォルトの動䜜ずしおすべおのJobをしようずするので泚意が必芁です。 今回はSpring Batch Integrationを䜿ったJobしかなく、か぀ディレクトリ監芖でJobを起動したいのでspring.batch.job.enabledをfalseずしお
アプリケヌション起動時にJobを実行しないようにしたした。

なお、ディレクトリ監芖を行うず、監芖察象のディレクトリがない堎合は䜜成しようずするみたいですね。

$ ll target/files
合蚈 8
drwxrwxr-x 2 xxxxx xxxxx 4096  5月 20 01:19 ./
drwxrwxr-x 9 xxxxx xxxxx 4096  5月 20 01:19 ../

最初にテヌブルの状態を確認。

mysql> select * from person;
Empty set (0.00 sec)

1ファむル目を監芖察象のディレクトリにコピヌしおみたす。

$ cp src/main/resources/isono_family.csv target/files

するず、アプリケヌションが動き始めたす。

2022-05-20 01:22:30.249  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}]
2022-05-20 01:22:30.249  INFO 18890 --- [   scheduling-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2022-05-20 01:22:30.307  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}]
2022-05-20 01:22:30.509  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}]
2022-05-20 01:22:30.668  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:22:30.975  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 306ms
2022-05-20 01:22:31.061  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:22:31.169  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 108ms
2022-05-20 01:22:31.224  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms
2022-05-20 01:22:31.231  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]

ログを芋るず、ファむルを怜出しお

2022-05-20 01:22:30.249  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}]

Jobに倉換。

2022-05-20 01:22:30.307  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}]

Spring BatchのJobを実行。

2022-05-20 01:22:30.668  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:22:30.975  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 306ms
2022-05-20 01:22:31.061  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:22:31.169  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 108ms
2022-05-20 01:22:31.224  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms

終了、ずいう様子がわかりたす。

2022-05-20 01:22:31.231  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]

デヌタを確認しおみたしょう。

mysql> select * from person;
+----+-----------+------------+------+---------------------+
| id | last_name | first_name | age  | updated_time        |
+----+-----------+------------+------+---------------------+
|  1 | 磯野      | カツオ     |   11 | 2022-05-20 01:22:31 |
|  2 | 磯野      | ワカメ     |    9 | 2022-05-20 01:22:31 |
|  3 | フグ田    | サザ゚     |   23 | 2022-05-20 01:22:31 |
|  4 | フグ田    | マスオ     |   32 | 2022-05-20 01:22:31 |
|  5 | フグ田    | タラオ     |    3 | 2022-05-20 01:22:31 |
+----+-----------+------------+------+---------------------+
5 rows in set (0.00 sec)

ちゃんず取り蟌たれおいたすね。

監芖察象のディレクトリに配眮したファむルは、Spring BatchのJobに含たれおいたTaskletにより削陀されおいたす。

$ tree target/files
target/files

0 directories, 0 files

もうひず぀のファむルを、監芖察象のディレクトリにコピヌしおみたす。

$ cp src/main/resources/namino_family.csv target/files

先ほどず同様、配眮されたファむルを怜出しおSpring Batchが起動したした。

2022-05-20 01:31:28.237  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/namino_family.csv, headers={file_originalFile=target/files/namino_family.csv, id=2e9cac33-a5be-8eeb-a471-d47f579906cc, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888237}]
2022-05-20 01:31:28.278  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}, headers={file_originalFile=target/files/namino_family.csv, id=c404ecd5-4b5a-2593-dc12-13b0621b3b54, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888278}]
2022-05-20 01:31:28.365  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}]
2022-05-20 01:31:28.448  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:31:28.581  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 132ms
2022-05-20 01:31:28.658  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:31:28.834  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 176ms
2022-05-20 01:31:28.939  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}] and the following status: [COMPLETED] in 538ms
2022-05-20 01:31:28.940  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=2, version=2, startTime=Fri May 20 01:31:28 JST 2022, endTime=Fri May 20 01:31:28 JST 2022, lastUpdated=Fri May 20 01:31:28 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=2, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}], headers={file_originalFile=target/files/namino_family.csv, id=f50295b0-dd2b-5d49-6196-d61c5507b3d5, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888940}]

取り蟌たれたデヌタ。

mysql> select * from person;
+----+-----------+--------------+------+---------------------+
| id | last_name | first_name   | age  | updated_time        |
+----+-----------+--------------+------+---------------------+
|  1 | 磯野      | カツオ       |   11 | 2022-05-20 01:22:31 |
|  2 | 磯野      | ワカメ       |    9 | 2022-05-20 01:22:31 |
|  3 | フグ田    | サザ゚       |   23 | 2022-05-20 01:22:31 |
|  4 | フグ田    | マスオ       |   32 | 2022-05-20 01:22:31 |
|  5 | フグ田    | タラオ       |    3 | 2022-05-20 01:22:31 |
|  6 | 波野      | ノリスケ     |   24 | 2022-05-20 01:31:29 |
|  7 | 波野      | タむコ       |   22 | 2022-05-20 01:31:29 |
|  8 | 波野      | むクラ       |    1 | 2022-05-20 01:31:29 |
+----+-----------+--------------+------+---------------------+
8 rows in set (0.00 sec)

ここで、もう1床最初に取り蟌んだファむルをコピヌしおみたす。

$ cp src/main/resources/isono_family.csv target/files

最初ず同じように、Spring Batchが起動したす。

2022-05-20 01:33:40.237  INFO 18890 --- [   scheduling-1] polling file                             : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ef090587-8d4d-0932-7b89-62d2418d8391, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020237}]
2022-05-20 01:33:40.252  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}]
2022-05-20 01:33:40.320  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}]
2022-05-20 01:33:40.397  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [loadFileToDatabaseStep]
2022-05-20 01:33:40.504  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [loadFileToDatabaseStep] executed in 107ms
2022-05-20 01:33:40.571  INFO 18890 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [deleteFileStep]
2022-05-20 01:33:40.631  INFO 18890 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [deleteFileStep] executed in 60ms
2022-05-20 01:33:40.691  INFO 18890 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 348ms
2022-05-20 01:33:40.692  INFO 18890 --- [   scheduling-1] job execution result                     : GenericMessage [payload=JobExecution: id=3, version=2, startTime=Fri May 20 01:33:40 JST 2022, endTime=Fri May 20 01:33:40 JST 2022, lastUpdated=Fri May 20 01:33:40 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=3, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=23cb8db4-88cd-5bfb-6557-aca9275a8b1f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020691}]

Jobの構成にRunIdIncrementerを含めおいるので、run.idがむンクリメントされおいるので䞎えるファむルパスが同じでも問題なく
起動したす。

2022-05-20 01:33:40.252  INFO 18890 --- [   scheduling-1] transform job request                    : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}]

RunIdIncrementerを含めずに同じJobParametersでJobを起動した堎合は、Spring Batchの実行に倱敗するこずになりたす。

同じデヌタのupdated_timeが曎新されおいるこずも確認しおおきたす。

mysql> select * from person;
+----+-----------+--------------+------+---------------------+
| id | last_name | first_name   | age  | updated_time        |
+----+-----------+--------------+------+---------------------+
|  1 | 磯野      | カツオ       |   11 | 2022-05-20 01:33:40 |
|  2 | 磯野      | ワカメ       |    9 | 2022-05-20 01:33:40 |
|  3 | フグ田    | サザ゚       |   23 | 2022-05-20 01:33:40 |
|  4 | フグ田    | マスオ       |   32 | 2022-05-20 01:33:40 |
|  5 | フグ田    | タラオ       |    3 | 2022-05-20 01:33:40 |
|  6 | 波野      | ノリスケ     |   24 | 2022-05-20 01:31:29 |
|  7 | 波野      | タむコ       |   22 | 2022-05-20 01:31:29 |
|  8 | 波野      | むクラ       |    1 | 2022-05-20 01:31:29 |
+----+-----------+--------------+------+---------------------+
8 rows in set (0.00 sec)

OKですね。

これで、Spring BatchずSpring Integrationの組み合わせで動䜜確認できたした。

オマケRunIdIncrementerの代わりにJsrJobParametersConverterを䜿った堎合は

今回、RunIdIncrementerを䜿甚したしたがJsrJobParametersConverterを䜿っおもよいのではずいう疑問を持ったりはしたす。

RunIdIncrementerをコメントアりトしお

    @Bean
    public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("loadFileToDatabaseJob")
                // .incrementer(new RunIdIncrementer())
                .start(loadFileToDatabaseStep(null))
                .next(deleteFileStep(null))
                .build();
    }

JsrJobParametersConverterをBeanずしお定矩しおもよいのではないのでしょうか。

    @Bean
    public JsrJobParametersConverter jsrJobParametersConverter(DataSource dataSource) {
        return new JsrJobParametersConverter(dataSource);
    }

これは、うたくいきたせん。

この定矩方法でJsrJobParametersConverterが䜿われるのは、Spring BatchをJobLauncherApplicationRunnerを䜿っお起動した堎合ですね。

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherApplicationRunner.java

もしくは、JsrJobOperatorを䜿っおもよさそうではありたす。

https://github.com/spring-projects/spring-batch/blob/4.3.5/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/launch/JsrJobOperator.java

Spring Batch Integrationを介しお実行できなさそうではありたすが 。

たずめ

Spring BatchずSpring Integrationを組み合わせた、Spring Batch Integrationを詊しおみたした。

ドキュメントが少ないのでSpring BatchずSpring Integrationの぀なぎ蟌みはやや苊劎したしたが、そこをクリアすればSpring Batchず
Spring Integrationそれぞれに知識があれば簡単に䜿える感じがしたしたね。

Spring IntegrationのInbound Channel Adapterを介しお取埗するMessageの単䜍がSpring BatchのJobの起動単䜍になるので、
その粒床で倧䞈倫ならSpring Integrationの機胜が䜿えるのでSpring Batchを䜿った衚珟範囲が倧きくなるのはよくわかりたした。

芚えおおくずよさそうですね。

Spring Boot ActuatorでデヌタストアJDBC、Redisを含んだヘルスチェックを行う

これは、なにをしたくお曞いたもの

Spring Boot Actuatorのヘルスチェックを行う時に、䞀緒にデヌタストアなどのヘルスチェックも行っおくれるようなので、
ちょっず芋おみたした。

Spring Boot ActuatorHealth Information

そもそも、Spring Boot Actuatorはずいうずころからですが。

Spring Boot Actuatorは、本番環境でアプリケヌションを動䜜させる際にモニタリング、管理するのに䟿利な機胜を含んだものです。
HTTP゚ンドポむントたたはJMXを䜿甚しお、アプリケヌションを管理したりモニタリングしたりできたす。監査、ヘルスチェック、
メトリクスの収集などを含んでいたす。

Spring Boot includes a number of additional features to help you monitor and manage your application when you push it to production. You can choose to manage and monitor your application by using HTTP endpoints or with JMX. Auditing, health, and metrics gathering can also be automatically applied to your application.

Production-ready Features

Spring Boot Actuatorには、耇数の゚ンドポむントず呌ばれるものがありたす。その䞀芧は、以䞋に蚘茉しおありたす。

Production-ready Features / Endpoints

ヘルスチェックの堎合は、health゚ンドポむントが該圓したす。以降はヘルスチェックhealth゚ンドポむントに絞っおドキュメントを
芋おいきたす。

Spring Boot Actuatorのヘルスチェックに関するドキュメントは、こちら。

Production-ready Features / Endpoints / Health Information

ヘルスチェックに䜿われる情報は、HealthContributorRegistryから収集されたものになるず曞かれおいたす。

Health information is collected from the content of a HealthContributorRegistry (by default, all HealthContributor instances defined in your ApplicationContext).

デフォルトでは、ApplicationContextで定矩されおいるすべおのHealthContributorむンスタンスが含たれおいたす。

HealthContributorはむンタヌフェヌスであり、HealthIndicatorたたはCompositeHealthContributorのどちらかになりたす。
HealthIndicatorは、Statusを含むヘルス情報を提䟛するむンタヌフェヌスです。CompositeHealthContributorは、耇数のHealthContributorsの
組み合わせになりたす。

A HealthContributor can be either a HealthIndicator or a CompositeHealthContributor. A HealthIndicator provides actual health information, including a Status. A CompositeHealthContributor provides a composite of other HealthContributors.

デフォルトでは、ステヌタスの順に各HealthIndicatorのステヌタスを䞊び替えるこずにより、最終的なシステムのヘルス情報は
StatusAggregatorによっお導出されたす。

By default, the final system health is derived by a StatusAggregator, which sorts the statuses from each HealthIndicator based on an ordered list of statuses.

゜ヌトされた結果、最初のステヌタスが党䜓のヘルスチェックの結果ずなりたす。

The first status in the sorted list is used as the overall health status.

Statusずいうのは、こちらですね。

Status (Spring Boot 2.6.7 API)

UP、DOWN、OUT_OF_SERVICE、UNKNOWNの4぀のステヌタスがありたす。

デフォルトの゜ヌト順は、DOWN → OUT_OF_SERVICE → UP → UNKNOWNのようです。

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/SimpleStatusAggregator.java#L44-L49

぀たり、どれかひず぀でもDOWNを返すHealthIndicatorがあれば、党䜓のヘルスチェックの結果ずしおはDOWNずなるわけですね。

ちなみに、StatusAggregatorが知っおいるステヌタスを返すHealthIndicatorがひず぀もない堎合、党䜓のステヌタスずしおはUNKNOWNに
なるようです。

If no HealthIndicator returns a status that is known to the StatusAggregator, an UNKNOWN status is used.

たた、Spring Bootはいく぀かのHealthIndicatorを自動構成したす。以䞋にリストアップされおいたす。

Production-ready Features / Endpoints / Health Information / Auto-configured HealthIndicators

珟時点で、以䞋が察象ずなるようです。内はキヌ名です。

  • Apache Cassandracassandra
  • Couchbasecouchbase
  • DataSourcedb
  • ディスクdiskspace
  • Elasticsearchelasticsearch
  • Hazelcasthazelcast
  • InfluxDBinfluxdb
  • JMSブロヌカヌjms
  • LDAPサヌバヌldap
  • メヌルサヌバヌmail
  • MongoDBmongo
  • Neo4jneo4j
  • 垞にUPを返すHealthIndicatorping
  • RabbitMQrabbit
  • Redisredis
  • Apache Solrsolr

各HealthIndicatorは、management.health.[キヌ名].enabledプロパティをfalseに指定するず無効にするこずができたす。
デフォルトで自動構成されるHealthIndicatorをすべお無効にする堎合は、management.health.defaults.enabledプロパティを
䜿えばよいみたいです。

ちなみに、デフォルトで有効になっおいないLivenessStateHealthIndicatorlivenessstate、ReadinessStateHealthIndicatorreadinessstate
ずいう2぀のHealthIndicatorもあるようです。こちらは、Kubernetes向けみたいですね。

今回は扱いたせんが、Spring Boot Actuatorで甚意されおいるHealthIndicatorで足りない堎合は、自分でHealthIndicatorを䜜成するこずに
なるんでしょうね。

Production-ready Features / Endpoints / Health Information / Writing Custom HealthIndicators

ヘルスチェック内で䜿甚されるHealthIndicatorの情報は、詳现情報ずなるのでデフォルトでは衚瀺されたせん。これを衚瀺するように
するにはmanagement.endpoint.health.show-detailsプロパティたたはmanagement.endpoint.health.show-componentsプロパティを
䜿甚したす。

The information exposed by the health endpoint depends on the management.endpoint.health.show-details and management.endpoint.health.show-components properties, which can be configured with one of the following values:

Production-ready Features / Endpoints / Health Information

説明はこれくらいにしお、今回はDataSourceJDBCですねずRedisを含むように構成しお、ヘルスチェックを行っおみたしょう。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-110-generic", arch: "amd64", family: "unix"

デヌタベヌスには、MySQLを䜿うこずにしたす。バヌゞョンは以䞋で、172.17.0.2で動䜜しおいるものずしたす。

$ mysql --version
mysql  Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)

Redisのバヌゞョンはこちら。Redisは172.17.0.3で動䜜しおいるものずしたす。

$ bin/redis-server --version
Redis server v=7.0.0 sha=00000000:0 malloc=jemalloc-5.2.1 bits=64 build=45b2bca311da7733

アプリケヌションを䜜成する

たずは、Spring Bootプロゞェクトを䜜成したす。䟝存関係にweb、actuator、jdbc、mysql、data-redisを加えおいたす。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=actuator-healthcheck-with-datastore \
  -d groupId=org.littlewings \
  -d artifactId=actuator-healthcheck-with-datastore \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.actuator \
  -d dependencies=web,actuator,jdbc,mysql,data-redis \
  -d baseDir=actuator-healthcheck-with-datastore | tar zxvf -

プロゞェクト内に移動。

$ cd actuator-healthcheck-with-datastore

Mavenの䟝存関係やプラグむン蚭定など。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-actuator</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-data-redis</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-jdbc</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-web</artifactId>
                </dependency>

                <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

自動生成された゜ヌスコヌドは、削陀しおおきたす。

$ rm src/main/java/org/littlewings/spring/actuator/ActuatorHealthcheckWithDatastoreApplication.java src/test/java/org/littlewings/spring/actuator/ActuatorHealthcheckWithDatastoreApplicationTests.java

動䜜確認のために、甚意したデヌタストアにアクセスするControllerを䜜成。

src/main/java/org/littlewings/spring/actuator/DataStoreController.java

package org.littlewings.spring.actuator;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("datastore")
public class DataStoreController {
    NamedParameterJdbcTemplate jdbcTemplate;
    StringRedisTemplate redisTemplate;

    public DataStoreController(NamedParameterJdbcTemplate jdbcTemplate, StringRedisTemplate redisTemplate) {
        this.jdbcTemplate = jdbcTemplate;
        this.redisTemplate = redisTemplate;
    }

    @GetMapping("jdbc")
    public String jdbc() {
        return jdbcTemplate
                .queryForObject("select now()", Collections.emptyMap(), LocalDateTime.class)
                .format(DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss"));
    }

    @GetMapping("redis")
    public String redis() {
        return redisTemplate
                .getRequiredConnectionFactory()
                .getConnection()
                .serverCommands()
                .info()
                .getProperty("redis_version");
    }
}

MySQLにselect now()を投げるメ゜ッドず、Redisのバヌゞョンを取埗するメ゜ッドを䜜成しおいたす。

mainクラス。

src/main/java/org/littlewings/spring/actuator/App.java

package org.littlewings.spring.actuator;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

蚭定は、たずは最䜎限にしおおきたす。

src/main/resources/application.properties

spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8
spring.datasource.username=kazuhira
spring.datasource.password=password

spring.redis.url=redis://redispass@172.17.0.3:6379
spring.redis.connect-timeout=3s
spring.redis.timeout=3s

Redisだけタむムアりトを入れおいるのは、ちょっず理由がありたす。

動䜜確認しおおきたしょう。

パッケヌゞングしお、アプリケヌションを起動。

$ mvn package
$ java -jar target/actuator-healthcheck-with-datastore-0.0.1-SNAPSHOT.jar

確認。

# MySQL
$ curl localhost:8080/datastore/jdbc
2022-05-14 13:30:15


# Redis
$ curl localhost:8080/datastore/redis
7.0.0

OKですね。

Spring Boot Actuatorのヘルスチェックを確認しおみる

では、Spring Boot Actuatorのヘルスチェックの確認に移りたしょう。

デフォルトでは/actuator/healthがSpring Boot Actuatorのヘルスチェックのパスずしお公開されるので、こちらにアクセスしおみたす。

$ curl -i localhost:8080/actuator/health
HTTP/1.1 200
Content-Type: application/vnd.spring-boot.actuator.v3+json
Transfer-Encoding: chunked
Date: Sat, 14 May 2022 13:37:33 GMT

{"status":"UP"}


$ curl -s localhost:8080/actuator/health | jq
{
  "status": "UP"
}

HTTPステヌタスコヌドは200で、ステヌタスはUPず返っおきたした。

ここで、Redisを停止しおアクセスしおみたす。

$ curl -i localhost:8080/actuator/health
HTTP/1.1 503
Content-Type: application/vnd.spring-boot.actuator.v3+json
Transfer-Encoding: chunked
Date: Sat, 14 May 2022 13:38:16 GMT
Connection: close

{"status":"DOWN"}


$ curl -s localhost:8080/actuator/health | jq
{
  "status": "DOWN"
}

HTTPステヌタスコヌドが503ずなり、ステヌタスはDOWNずなりたした。

Redisにアクセスできなくなったので、ヘルスチェック党䜓ずしおはDOWNずなったようです。

アプリケヌション偎では、こんなログが出力されおいたす。

2022-05-14 22:38:08.806  INFO 26433 --- [xecutorLoop-1-4] i.l.core.protocol.ConnectionWatchdog     : Reconnecting, last destination was /172.17.0.3:6379
2022-05-14 22:38:08.807  WARN 26433 --- [ioEventLoop-4-1] i.l.core.protocol.ConnectionWatchdog     : Cannot reconnect to [172.17.0.3/<unresolved>:6379]: 接続を拒吊されたした: /172.17.0.3:6379
2022-05-14 22:38:14.606  INFO 26433 --- [xecutorLoop-1-1] i.l.core.protocol.ConnectionWatchdog     : Reconnecting, last destination was 172.17.0.3/<unresolved>:6379
2022-05-14 22:38:14.607  WARN 26433 --- [ioEventLoop-4-3] i.l.core.protocol.ConnectionWatchdog     : Cannot reconnect to [172.17.0.3/<unresolved>:6379]: 接続を拒吊されたした: /172.17.0.3:6379

ずころで、Spring Data Redisの蚭定でタむムアりトを蚭定しおいたのは、なにも指定しないずレスポンスが返っおくるたでにずおも
時間がかかるからですね 。

ただ、これだずレスポンスからはなにが起こっおいるのかわかりたせん。src/main/resources/application.propertiesに以䞋を远加しお
みたしょう。

management.endpoint.health.show-details=always

パッケヌゞングしお、再床起動。

$ mvn package
$ java -jar target/actuator-healthcheck-with-datastore-0.0.1-SNAPSHOT.jar

Redisも起動しおおきたす。

確認するず、こんな結果になりたした。

$ curl -i localhost:8080/actuator/health
HTTP/1.1 200
Content-Type: application/vnd.spring-boot.actuator.v3+json
Transfer-Encoding: chunked
Date: Sat, 14 May 2022 13:47:55 GMT

{"status":"UP","components":{"db":{"status":"UP","details":{"database":"MySQL","validationQuery":"isValid()"}},"diskSpace":{"status":"UP","details":{"total":100000000000,"free":10000000000,"threshold":10485760,"exists":true}},"ping":{"status":"UP"},"redis":{"status":"UP","details":{"version":"7.0.0"}}}}

ちょっずわかりづらいので、jqで芋おみたしょう。

$ curl -s localhost:8080/actuator/health | jq
{
  "status": "UP",
  "components": {
    "db": {
      "status": "UP",
      "details": {
        "database": "MySQL",
        "validationQuery": "isValid()"
      }
    },
    "diskSpace": {
      "status": "UP",
      "details": {
        "total": 100000000000,
        "free": 10000000000,
        "threshold": 10485760,
        "exists": true
      }
    },
    "ping": {
      "status": "UP"
    },
    "redis": {
      "status": "UP",
      "details": {
        "version": "7.0.0"
      }
    }
  }
}

db、diskSpace、ping、redisの4぀で構成されおいるこずがわかりたす。pingは、垞にUPを返すHealthIndicatorでした。

ディスクの情報は今回はやめおおきたしょう。無効にしたす。

management.health.diskspace.enabled=false

再床パッケヌゞングしお起動。

$ mvn package
$ java -jar target/actuator-healthcheck-with-datastore-0.0.1-SNAPSHOT.jar

確認。ディスクの情報がなくなりたした。

$ curl -s localhost:8080/actuator/health | jq
{
  "status": "UP",
  "components": {
    "db": {
      "status": "UP",
      "details": {
        "database": "MySQL",
        "validationQuery": "isValid()"
      }
    },
    "ping": {
      "status": "UP"
    },
    "redis": {
      "status": "UP",
      "details": {
        "version": "7.0.0"
      }
    }
  }
}

ここで、Redisを停止しおアクセスしおみたす。

$ curl -i localhost:8080/actuator/health
HTTP/1.1 503
Content-Type: application/vnd.spring-boot.actuator.v3+json
Transfer-Encoding: chunked
Date: Sat, 14 May 2022 13:52:30 GMT
Connection: close

{"status":"DOWN","components":{"db":{"status":"UP","details":{"database":"MySQL","validationQuery":"isValid()"}},"ping":{"status":"UP"},"redis":{"status":"DOWN","details":{"error":"org.springframework.dao.QueryTimeoutException: Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 3 second(s)"}}}}



$ curl -s localhost:8080/actuator/health | jq
{
  "status": "DOWN",
  "components": {
    "db": {
      "status": "UP",
      "details": {
        "database": "MySQL",
        "validationQuery": "isValid()"
      }
    },
    "ping": {
      "status": "UP"
    },
    "redis": {
      "status": "DOWN",
      "details": {
        "error": "org.springframework.dao.QueryTimeoutException: Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 3 second(s)"
      }
    }
  }
}

各構成芁玠のうち、RedisだけがDOWNになっおいるこずがわかりたす。errorには倱敗した時の䟋倖メッセヌゞが入っおいたすね。

Statusの゜ヌト順は、この結果からはわからなさそうです。

ここで、さらにRedisもヘルスチェックの察象から倖しおみたしょう。

management.health.redis.enabled=false

パッケヌゞングしお起動。Redisは停止したたたです。

$ mvn package
$ java -jar target/actuator-healthcheck-with-datastore-0.0.1-SNAPSHOT.jar

するず、ヘルスチェックが正垞に終了するようになりたす。

$ curl -i localhost:8080/actuator/health
HTTP/1.1 200
Content-Type: application/vnd.spring-boot.actuator.v3+json
Transfer-Encoding: chunked
Date: Sat, 14 May 2022 13:55:13 GMT

{"status":"UP","components":{"db":{"status":"UP","details":{"database":"MySQL","validationQuery":"isValid()"}},"ping":{"status":"UP"}}}


$ curl -s localhost:8080/actuator/health | jq
{
  "status": "UP",
  "components": {
    "db": {
      "status": "UP",
      "details": {
        "database": "MySQL",
        "validationQuery": "isValid()"
      }
    },
    "ping": {
      "status": "UP"
    }
  }
}

圓然ですが、Redisにアクセスする機胜を䜿甚するず倱敗したす。

$ curl localhost:8080/datastore/redis
{"timestamp":"2022-05-14T13:56:27.308+00:00","status":500,"error":"Internal Server Error","path":"/datastore/redis"}

Redisのヘルスチェックは、再床有効にしおおきたしょう。

#management.health.redis.enabled=false

ちなみに、MySQLだけを停止した堎合はこうなりたした。

$ curl -i localhost:8080/actuator/health
HTTP/1.1 503
Content-Type: application/vnd.spring-boot.actuator.v3+json
Transfer-Encoding: chunked
Date: Sat, 14 May 2022 15:38:57 GMT
Connection: close

{"status":"DOWN","components":{"db":{"status":"DOWN","details":{"error":"org.springframework.jdbc.CannotGetJdbcConnectionException: Failed to obtain JDBC Connection; nested exception is com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure\n\nThe last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server."}},"ping":{"status":"UP"},"redis":{"status":"UP","details":{"version":"7.0.0"}}}}


$ curl -s localhost:8080/actuator/health | jq
{
  "status": "DOWN",
  "components": {
    "db": {
      "status": "DOWN",
      "details": {
        "error": "org.springframework.jdbc.CannotGetJdbcConnectionException: Failed to obtain JDBC Connection; nested exception is com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure\n\nThe last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server."
      }
    },
    "ping": {
      "status": "UP"
    },
    "redis": {
      "status": "UP",
      "details": {
        "version": "7.0.0"
      }
    }
  }
}

DataSource、Redisで行うヘルスチェックの内容を確認しおみる

ずころで、DataSourceずRedisのHealthIndicatorはなにを確認しおいるんでしょうね

ちょっず芋おみたしょう。

たずは、ヘルスチェックのレスポンスの内容をもう1床芋おみたす。

{
  "status": "UP",
  "components": {
    "db": {
      "status": "UP",
      "details": {
        "database": "MySQL",
        "validationQuery": "isValid()"
      }
    },
    "ping": {
      "status": "UP"
    },
    "redis": {
      "status": "UP",
      "details": {
        "version": "7.0.0"
      }
    }
  }
}

dbの堎合はvalidationQuery、redisの堎合はversionがちょっず䞍思議な感じがしたすね。

dbに割り圓おられおいるDataSourceHealthIndicatorの堎合は、バリデヌション甚のク゚リヌが蚭定されおいればそちらを䜿い、
そうでなければJDBCドラむバヌのConnection#isValidメ゜ッドを䜿うようです。

Connection#isValid(int timeout))

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/jdbc/DataSourceHealthIndicator.java#L107-L118

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/jdbc/DataSourceHealthIndicator.java#L134

぀たり、この状態のvalidQueryずいうのはConnection#isValidでヘルスチェックを行っおいるこずを瀺しおいたす。

    "db": {
      "status": "UP",
      "details": {
        "database": "MySQL",
        "validationQuery": "isValid()"
      }
    },

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/jdbc/DataSourceHealthIndicator.java#L115

redisに割り圓おられおいるRedisHealthIndicatorの堎合は、Redisがクラスタヌ構成かどうかで倉わるようです。今回はスタンドアロンですね。

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisHealthIndicator.java#L58-L64

スタンドアロンのRedisむンスタンスの堎合は、Redisのinfoコマンドのredis_versionプロパティを返すようになっおいたす。

https://github.com/spring-projects/spring-boot/blob/v2.6.7/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/redis/RedisHealth.java#L37

それがこちらですね。

    "redis": {
      "status": "UP",
      "details": {
        "version": "7.0.0"
      }
    }

ちなみに、Controllerでも同じこずをやっおいたした。

    @GetMapping("redis")
    public String redis() {
        return redisTemplate
                .getRequiredConnectionFactory()
                .getConnection()
                .serverCommands()
                .info()
                .getProperty("redis_version");
    }

redis-cliで確認するず、こんな感じになりたす。

$ bin/redis-cli -a redispass info server | grep redis_version
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
redis_version:7.0.0

最埌に、DataSourceHealthIndicatorで䜿うvalidationQueryを蚭定しおみたしょう。

Spring BootはコネクションプヌルにHikariCPを䜿うようになっおいるため、spring.datasource.hikari.connection-test-queryを指定したす。

spring.datasource.hikari.connection-test-query=select now()

ちなみに、HikariCPのconnectionTestQueryですが、JDBC version 4以降を䜿っおいる堎合は䜿わないこずを匷く掚奚されおいたす。

If your driver supports JDBC4 we strongly recommend not setting this property.

HikariCP / Configuration

Connection#isValidをサポヌトしおいないレガシヌな環境向けのものです。

This is for "legacy" drivers that do not support the JDBC4 Connection.isValid() API.

぀たり、ふだん䜿う時はspring.datasource.hikari.connection-test-queryを蚭定しないようにしおおきたしょう。

パッケヌゞングしお起動。

$ mvn package
$ java -jar target/actuator-healthcheck-with-datastore-0.0.1-SNAPSHOT.jar

確認。

$ curl -i localhost:8080/actuator/health
HTTP/1.1 200
Content-Type: application/vnd.spring-boot.actuator.v3+json
Transfer-Encoding: chunked
Date: Sat, 14 May 2022 15:47:13 GMT

{"status":"UP","components":{"db":{"status":"UP","details":{"database":"MySQL","validationQuery":"select now()","result":"2022-05-14T15:47:13"}},"ping":{"status":"UP"},"redis":{"status":"UP","details":{"version":"7.0.0"}}}}


$ curl -s localhost:8080/actuator/health | jq
{
  "status": "UP",
  "components": {
    "db": {
      "status": "UP",
      "details": {
        "database": "MySQL",
        "validationQuery": "select now()",
        "result": "2022-05-14T15:47:31"
      }
    },
    "ping": {
      "status": "UP"
    },
    "redis": {
      "status": "UP",
      "details": {
        "version": "7.0.0"
      }
    }
  }
}

validationQueryの内容が倉わり、ク゚リヌの結果たで入るようになりたした。

    "db": {
      "status": "UP",
      "details": {
        "database": "MySQL",
        "validationQuery": "select now()",
        "result": "2022-05-14T15:47:31"
      }
    },

これで、確認OKずしたしょう。

たずめ

Spring Boot Actuatorで、デヌタストアたで含んだヘルスチェックを行っおみたした。

なんずなくこういう機胜があり、ちょっず興味があったのでこの機䌚に詊しおおいおよかったです。

最埌に、application.propertiesのコメントアりトしおいる内容も含めお、党䜓を貌っおおきたす。
※ management.health.defaults.enabledは、デフォルトで有効なAuto ConfigurationされるHealthIndicatorをすべお無効にするプロパティです

src/main/resources/application.properties

spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8
spring.datasource.username=kazuhira
spring.datasource.password=password
#spring.datasource.hikari.connection-test-query=select now()

spring.redis.url=redis://redispass@172.17.0.3:6379
spring.redis.connect-timeout=3s
spring.redis.timeout=3s

management.endpoint.health.show-details=always

management.health.diskspace.enabled=false
#management.health.db.enabled=false
#management.health.ping.enabled=false
#management.health.redis.enabled=false

#management.health.defaults.enabled=false