ããã¯ããªã«ãããããŠæžãããã®ïŒ
Spring Batchã®ããã¥ã¡ã³ããèŠãŠããŠãSpring Integrationãšçµã¿åããããããã ã£ãã®ã§ãã¡ãã£ãšè©ŠããŠã¿ããããªãšã
Spring Batch Integration
Spring BatchãšãSpring Integrationã®çµã¿åããã«ã€ããŠã¯ããã¡ãã«èšèŒããããŸãã
Spring Batch IntegrationãšåŒã¶ããã§ãã
Spring Batchã®ãŠãŒã¶ãŒãSpring Integrationã®ãŠãŒã¶ãŒãšãã«ãåæ¹ãåãããŠäœ¿ãããšã§èŠä»¶ãããå¹ççã«å®çŸã§ããã±ãŒã¹ã«
ééããå¯èœæ§ãããããšãã話ã®ããã§ãã
Many users of Spring Batch may encounter requirements that are outside the scope of Spring Batch but that may be efficiently and concisely implemented by using Spring Integration. Conversely, Spring Integration users may encounter Spring Batch requirements and need a way to efficiently integrate both frameworks.
Spring Batch Integration / Spring Batch Integration Introduction
ãããããã»ã¹ã«ã¡ãã»ãŒãžã³ã°ãè¿œå ããããšã§ããªãã¬ãŒã·ã§ã³ã®èªååãšãããŒãšãªãæžå¿µäºé ã®åé¢ãå¯èœã«ãªããŸãã
Adding messaging to a batch process enables automation of operations and also separation and strategizing of key concerns.
ãã£ããèšããšãã¡ãã»ãŒãžã³ã°ã®ä»çµã¿ããSpring Batchã®Job
ãèµ·åããããã®ãã®ãã¿ããã§ãã
ãŸããã¡ãã»ãŒãžã³ã°ã®ä»çµã¿ããžã§ãã«åã蟌ã¿ãã¯ãŒã¯ããŒããè€æ°ã®ã¯ãŒã«ãŒïŒãªã¢ãŒãããŒãã£ã·ã§ãã³ã°ããªã¢ãŒããã£ã³ã¯ïŒã«
åæ£ããããã§ããããã§ãã
ã¡ãã»ãŒãžã³ã°ã䜿ã£ãSpring Batchã®èµ·åã«ã€ããŠã
Spring BatchãCommandLineJobRunner
ã䜿ã£ãŠèµ·åããããWebã¢ããªã±ãŒã·ã§ã³ã«ãããŠJobOperator
ãçŽæ¥æ±ã䜿ãæ¹ã
ãããŸãããããè€éãªãŠãŒã¹ã±ãŒã¹ã«ã€ããŠæããŠã¿ãŸãã
ããããžã§ãã®ããŒã¿ãååŸããããã«ããªã¢ãŒãã®FTPãSFTPãµãŒããŒãžã®ããŒãªã³ã°ãè¡ãå¿
èŠããã£ãããè€æ°ã®ç°ãªã
ããŒã¿ãœãŒã¹ããµããŒãããå¿
èŠããããããããŸããã
Maybe you need to poll a remote (S)FTP server to retrieve the data for the Batch Job or your application has to support multiple different data sources simultaneously.
Webã ãã§ãªããFTPãªã©ãããœãŒã¹ãšãªãããŒã¿ãã¡ã€ã«ãåãåããããããŸãããSpring BatchãåŒã³åºãåã«ãå
¥åãã¡ã€ã«ã®
å€æãå¿
èŠã«ãªãå ŽåããããããããŸããã
For example, you may receive data files not only from the web, but also from FTP and other sources. Maybe additional transformation of the input files is needed before invoking Spring Batch.
ãã®ãããªå Žåã¯ãSpring Integrationãšãã®å€æ°ã®ã¢ããã¿ãŒã䜿ã£ãŠããããžã§ããå®è¡ãããšããã匷åãªä»çµã¿ã«ãªãã§ãããã
Therefore, it would be much more powerful to execute the batch job using Spring Integration and its numerous adapters.
ããšãã°ãFile Inbound Channel Adapterã䜿çšããŠãã¡ã€ã«ã·ã¹ãã å
ã®ãã£ã¬ã¯ããªãç£èŠããå
¥åãã¡ã€ã«ãå°çãããããã«
ããããžã§ããéå§ããããšãã§ããŸããããã«ãè€æ°ã®ç°ãªãã¢ããã¿ãŒã䜿çšããŠSpring Integrationã®ãããŒãäœæãã
Configurationã®ã¿ã§è€æ°ã®ãœãŒã¹ããåæã«ããããžã§ãã®ããŒã¿ãåã蟌ãããšãã§ããŸãã
For example, you can use a File Inbound Channel Adapter to monitor a directory in the file-system and start the Batch Job as soon as the input file arrives. Additionally, you can create Spring Integration flows that use multiple different adapters to easily ingest data for your batch jobs from multiple sources simultaneously using only configuration.
ãã®ä»çµã¿ãå®çŸããããã«ãSpring Batch Integrationã§ã¯ä»¥äžãæäŸããŸãã
- ããããžã§ãã®èµ·åãè¡ã
JobLaunchingMessageHandler
JobLaunchingMessageHandler
ã®å ¥åãšãªããã€ããŒããæã€JobLaunchRequest
JobLaunchRequest
ã¯ãããããžã§ããèµ·åããã®ã«å¿
èŠãªJobParameters
ãšJob
ã®ã©ãããŒã§ãã
ããšã¯ãSpring Integrationã®Message
ããJobLaunchRequest
ãžå€æããæ¹æ³ã
JobExecution
ã«ã€ããŠïŒãžã§ãã®ãªããžããªãåç
§ããŠã¹ããŒã¿ã¹ã確èªããããšïŒã
Spring Batch Integrationã®èšå®äŸã
ãžã§ãã®äžã§äœ¿ãããItemReader
ã®èšå®äŸãšãã£ããã®ãç¶ããŸãã
ãšãããã¥ã¡ã³ãã ããçºããŠããŠãããããªãã®ã§ãå®éã«äœ¿ã£ãŠãã£ãŠã¿ãŸãããã
ãé¡
ä»åã®ãé¡ã¯ããã®ããã«ããŸãã
- ç¹å®ã®ãã£ã¬ã¯ããªãããŒãªã³ã°ããŠãCSVãã¡ã€ã«ã®é 眮ãç£èŠ
- CSVãã¡ã€ã«ãé 眮ãããããSpring Batchãèµ·å
- Spring Batchã®ãžã§ãã¯ãCSVãã¡ã€ã«ãèªã¿èŸŒã
ItemReader
ãšãèªã¿èŸŒãã ããŒã¿ãããŒã¿ããŒã¹ã«åæ ããItemWriter
ã§æ§æ- ããŒã¿ããŒã¹ãžã®åæ ã«ã¯JPAã䜿çš
- åããã¡ã€ã«ã眮ããŠãããžã§ããèµ·åããŠäžæžãæŽæ°ãã
- äžæžãããããšããããããã«ãããŒã¿ããŒã¹äžã®ããŒãã«ã«ã¯æŽæ°æéãèšãã
ç°å¢
ä»åã®ç°å¢ã¯ããã¡ãã
$ java --version openjdk 17.0.3 2022-04-19 OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1) OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing) $ mvn --version Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-110-generic", arch: "amd64", family: "unix"
ããŒã¿ããŒã¹ã«ã¯ãMySQLã䜿çšããŸããMySQLã¯172.17.0.2ã§åäœããŠãããã®ãšããŸãã
$ mysql --version mysql Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)
ãããžã§ã¯ããäœæãã
ãŸãã¯ãSpring Bootãããžã§ã¯ããäœæããŸããäŸåé¢ä¿ã«ã¯batch
ãintegration
ãdata-jpa
ãmysql
ãæå®ã
$ curl -s https://start.spring.io/starter.tgz \ -d bootVersion=2.6.7 \ -d javaVersion=17 \ -d name=batch-integration-example \ -d groupId=org.littlewings \ -d artifactId=batch-integration-example \ -d version=0.0.1-SNAPSHOT \ -d packageName=org.littlewings.spring.batch.integration \ -d dependencies=batch,integration,data-jpa,mysql \ -d baseDir=batch-integration-example | tar zxvf -
ãããžã§ã¯ãå ãžç§»åã
$ cd batch-integration-example
MavenäŸåé¢ä¿ããã³ãã©ã°ã€ã³èšå®ã
<properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
çæããããœãŒã¹ã³ãŒãã¯åé€ããŠãããŸãã
$ rm src/main/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplication.java src/test/java/org/littlewings/spring/batch/integration/BatchIntegrationExampleApplicationTests.java
ä»å䜿ãããäŸåã©ã€ãã©ãªã足ããªãã®ãšã䜿ããªããã®ãããã®ã§å°ãå€æŽã
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <!-- <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jpa</artifactId> </dependency> --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> </dependencies>
ãŸããä»åã®äž»é¡ã§ããspring-batch-integration
ãè¿œå ãããããSpring Integrationã䜿ã£ããã¡ã€ã«ç£èŠã®ããã«spring-integration-file
ãè¿œå ã
ãããŠãspring-integration-jpa
ã¯äœ¿ããªãã®ã§ã³ã¡ã³ãã¢ãŠãããŠãããŸãã
ããŒãã«å®çŸ©ã¯ããã®ããã«ããŠãããŸãã
src/main/resources/schema.sql
drop table if exists person; create table person ( id integer, last_name varchar(10), first_name varchar(10), age integer, updated_time datetime, primary key(id) );
ã¢ããªã±ãŒã·ã§ã³ã®èšå®ã¯ãã¡ãã
src/main/resources/application.properties
spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8 spring.datasource.username=kazuhira spring.datasource.password=password spring.sql.init.mode=always spring.batch.jdbc.initialize-schema=always spring.batch.job.enabled=false
schema.sql
ã¯åžžã«é©çšããããã«ããŠãSpring Batchã䜿çšããããŒãã«ãäœæããããã«ããŸãã
ããã²ãšã€ãã€ã³ããããã®ã§ãããããã¯ãŸã説æããŸãã
ã¢ããªã±ãŒã·ã§ã³ãäœæãã
ã§ã¯ãã¢ããªã±ãŒã·ã§ã³ãäœæããŠãããŸãããã
ãŸãã¯main
ã¯ã©ã¹ã®äœæã
src/main/java/org/littlewings/spring/batch/integration/App.java
package org.littlewings.spring.batch.integration; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing public class App { public static void main(String... args) { SpringApplication.run(App.class, args); } }
@EnableBatchProcessing
ã¢ãããŒã·ã§ã³ã¯å¿
èŠã§ããSpring Batchã®JobBuilderFactory
ãStepBuilderFactory
ãªã©ã
AutoConfigurationãããããã«ããå¿
èŠãããã®ã§ã
JPAã®ãšã³ãã£ãã£ã¯ã©ã¹ã
src/main/java/org/littlewings/spring/batch/integration/Person.java
package org.littlewings.spring.batch.integration; import java.time.LocalDateTime; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table(name = "person") public class Person { @Id @Column(name = "id") Integer id; @Column(name = "last_name") String lastName; @Column(name = "first_name") String firstName; @Column(name = "age") Integer age; @Column(name = "updated_time") LocalDateTime updatedTime; // getterïŒsetterã¯çç¥ }
DDLã«ã¯ååšããªãã£ãupdatedTime
ãšãããã£ãŒã«ãããããŸããããã¡ãã¯ItemProcessor
ã§èšå®ããããšã«ããŸãã
ããããã¯ããã¡ããèŠãªããSpring IntegrationãSpring Batchã®èšå®ãè¡ã£ãŠãããŸãã
æåã¯@Configuration
ãä»äžããã¯ã©ã¹ã®é圢ãäœæã
src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java
package org.littlewings.spring.batch.integration; import java.io.File; import java.nio.file.Paths; import java.time.Duration; import java.time.LocalDateTime; import javax.persistence.EntityManagerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.integration.launch.JobLaunchingGateway; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.database.JpaItemWriter; import org.springframework.batch.item.database.builder.JpaItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.PathResource; import org.springframework.core.io.Resource; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.file.dsl.Files; import org.springframework.integration.file.filters.SimplePatternFileListFilter; import org.springframework.integration.handler.LoggingHandler; @Configuration public class IntegrationJobConfig { // åŸã§ }
CSVãã¡ã€ã«ã®ç£èŠããæžããŠãããŸããããSpring Integrationã®File Supportã䜿ããŸãã
@Bean public IntegrationFlow integrationFlow() throws Exception { return IntegrationFlows .from( Files .inboundAdapter(new File("target/files")) .filter(new SimplePatternFileListFilter("*.csv")), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L)) ) .log(LoggingHandler.Level.INFO, "polling file") .transform(fileMessageToJobRequest(null)) .log(LoggingHandler.Level.INFO, "transform job request") .handle(jobLaunchingGateway(null)) .log(LoggingHandler.Level.INFO, "job execution result") .get(); }
target/files
ãã£ã¬ã¯ããªé
äžã®.csv
æ¡åŒµåã®ãã¡ã€ã«ãã1ç§ããã«ç£èŠããŸãããŸãã1åã§ååŸãããã¡ã€ã«ã¯ã²ãšã€ã«ããŠããŸãã
return IntegrationFlows .from( Files .inboundAdapter(new File("target/files")) .filter(new SimplePatternFileListFilter("*.csv")), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L)) )
ãã¡ã€ã«ãæ€åºããããtransform
ããåŸã«åŠçãè¡ããŸãïŒhandle
ïŒãåã¹ãããã®éã«ã¯ãã°åºåãè¡ãããã«ããŸãããã
log
ã¡ãœããã®ç¬¬2åŒæ°ã¯ãã¬ãŒåã§ãã
.log(LoggingHandler.Level.INFO, "polling file") .transform(fileMessageToJobRequest(null)) .log(LoggingHandler.Level.INFO, "transform job request") .handle(jobLaunchingGateway(null)) .log(LoggingHandler.Level.INFO, "job execution result") .get();
transform
ã¡ãœããã«æž¡ããŠããã®ã¯ãSpring Integrationã®Message
ãSpring Batchã®Job
ã«å€æããTransformer
ã§ãã
ã§ããããã®åã«handle
ã¡ãœããã«æž¡ããŠããåŠçããèŠãŠãããŸãããã
ãã¡ãã§ã¯ãJobLaunchingGateway
ãäœæããŠããŸãã
@Bean public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository); jobLauncher.setTaskExecutor(new SyncTaskExecutor()); jobLauncher.afterPropertiesSet(); return new JobLaunchingGateway(jobLauncher); }
JobLaunchingGateway
ã¯ãSpring Batchã®Job
ãå®è¡ããããã®MessageHandler
ã€ã³ã¿ãŒãã§ãŒã¹ã®å®è£
ã§ãã
JobLaunchingGateway (Spring Batch 4.3.5 API)
ãã®ãããJobLauncher
ã®ã€ã³ã¹ã¿ã³ã¹ãå¿
èŠãšããŸãã
Configuring and Running a Job / Configuring a JobLauncher
ããããå
ã¯ãSpring Batchã®Job
ãæ§æããŠãããŸãã
Job
ã®å®çŸ©ãåãJobParameters
ã®æå®ã§ãè€æ°åèµ·åã§ããããã«ãRunIdIncrementer
ãæå®ããŠããŸãã
@Bean public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) { return jobBuilderFactory .get("loadFileToDatabaseJob") .incrementer(new RunIdIncrementer()) .start(loadFileToDatabaseStep(null)) .next(deleteFileStep(null)) .build(); }
Step
ã¯2ã€èšããŸããã
ã²ãšã€ç®ã®Step
ã¯ããã¡ã€ã«ãèªã¿èŸŒãã§ããŒã¿ããŒã¹ã«æžã蟌ããã®ã§ãã
@Bean public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory .get("loadFileToDatabaseStep") .<Person, Person>chunk(3) .reader(flatFilePersonItemReader(null)) .processor(updateTimeProcessor()) .writer(jpaPersonItemWriter(null)) .build(); }
ItemReader
ãItemProcessor
ãItemWriter
ã®å®çŸ©ã¯ãã¡ãã
CSVãã¡ã€ã«ãèªã¿èŸŒã â ãšã³ãã£ãã£ã«updatedTime
ãèšå®ãã â JPAã§ããŒã¿ããŒã¹ã«æžã蟌ãããšããæµãã«ãªã£ãŠããŸãã
@Bean @StepScope public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) { Resource resource = new PathResource(path); return new FlatFileItemReaderBuilder<Person>() .name("flatFilePersonItemReader") .resource(resource) .encoding("UTF-8") .delimited() .names(new String[]{"id", "lastName", "firstName", "age"}) .linesToSkip(1) .targetType(Person.class) .build(); } @Bean @StepScope public ItemProcessor<Person, Person> updateTimeProcessor() { return (person) -> { person.setUpdatedTime(LocalDateTime.now()); return person; }; } @Bean @StepScope public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) { return new JpaItemWriterBuilder<Person>() .entityManagerFactory(entityManagerFactory) .build(); }
å
¥åãšãªãCSVãã¡ã€ã«ã®ãã¹ã¯ãJobParameters
ãšããŠååŸããŸãã
@Bean @StepScope public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
æåŸã¯ãèªã¿èŸŒãã ãã¡ã€ã«ãåé€ããStep
ã§ãããã¡ã€ã«ãåé€ããåŠçãå
¥ããªããšãç£èŠå¯Ÿè±¡ã®ãã£ã¬ã¯ããªã«ãã¡ã€ã«ã
æ®ãç¶ããã®ã§ãããŒã¿ã®åã蟌ã¿åŠçã延ã
ãšç¹°ãè¿ããŠããŸããŸãã
@Bean public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory .get("deleteFileStep") .tasklet(deleteFileTasklet(null)) .build(); }
ãã¡ã€ã«ã®åé€ãè¡ãTasklet
ããã®ãããªStep
ã§ã¯ãªãéãæ¹æ³ã§ããã¡ã€ã«ã®åé€ã¯ã§ãããšæãã®ã§ãããã©ããããã«åé€ã®
åŠçãæžããªããšãããªããããªããšã¯å€ãããªãã®ã§ãä»åã¯ãã®æ¹æ³ãéžæããŸããã
@Bean @StepScope public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) { return (contribution, context) -> { java.nio.file.Files.delete(Paths.get(path)); return RepeatStatus.FINISHED; }; }
ãããŠããã®Spring Batchã§å®çŸ©ããJob
ãš
@Bean public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) { return jobBuilderFactory .get("loadFileToDatabaseJob") .incrementer(new RunIdIncrementer()) .start(loadFileToDatabaseStep(null)) .next(deleteFileStep(null)) .build(); }
Job
ãå®è¡ããããã«Spring Integrationã®Message
ãJob
ã«å€æããã®ããTransformer
ã§ããã
public IntegrationFlow integrationFlow() throws Exception { return IntegrationFlows .from( Files .inboundAdapter(new File("target/files")) .filter(new SimplePatternFileListFilter("*.csv")), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L)) ) .log(LoggingHandler.Level.INFO, "polling file") .transform(fileMessageToJobRequest(null)) .log(LoggingHandler.Level.INFO, "transform job request") .handle(jobLaunchingGateway(null)) .log(LoggingHandler.Level.INFO, "job execution result") .get();
ãã®åŠçã®å 容ã¯ããã¡ãã
@Bean public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) { FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(); fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null)); fileMessageToJobRequest.setJobExplorer(jobExplorer); return fileMessageToJobRequest; }
åèã«ããŠããããã¥ã¡ã³ãã¯ããã¡ãã§ããã
Job
ããã³JobExplorer
ãèšå®ããŠããŸãã
fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null));
fileMessageToJobRequest.setJobExplorer(jobExplorer);
ã¯ã©ã¹ã®å®çŸ©ã¯ããã¡ãã
src/main/java/org/littlewings/spring/batch/integration/FileMessageToJobRequest.java
package org.littlewings.spring.batch.integration; import java.io.File; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.integration.launch.JobLaunchRequest; import org.springframework.format.annotation.DateTimeFormat; import org.springframework.integration.annotation.Transformer; import org.springframework.messaging.Message; public class FileMessageToJobRequest { Job job; JobExplorer jobExplorer; @Transformer public JobLaunchRequest toRequest(Message<File> message) { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer); // for JobParametersIncrementer jobParametersBuilder.getNextJobParameters(job) // for JobParametersIncrementer .addString("input.file.path", message.getPayload().getAbsolutePath()); return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters()); } public Job getJob() { return job; } public void setJob(Job job) { this.job = job; } public JobExplorer getJobExplorer() { return jobExplorer; } public void setJobExplorer(JobExplorer jobExplorer) { this.jobExplorer = jobExplorer; } }
ãã€ã³ãã¯ããã¡ãã®ã¡ãœããã§ãã@Transformer
ã¢ãããŒã·ã§ã³ãä»äžããŠãSpring Integrationã®File Supportã䜿ã£ãŠæ€åºãã
CSVãã¡ã€ã«ïŒMessage<File>
ïŒãJobLaunchRequest
ã«å€æããŸãã
@Transformer public JobLaunchRequest toRequest(Message<File> message) { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(jobExplorer); // for JobParametersIncrementer jobParametersBuilder.getNextJobParameters(job) // for JobParametersIncrementer .addString("input.file.path", message.getPayload().getAbsolutePath()); return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters()); }
CSVãã¡ã€ã«ã®ãã¹ã¯ããã®éšåã§JobParameters
ãšããŠèšå®ããŠããŸãã
.addString("input.file.path", message.getPayload().getAbsolutePath());
ããã§ã®æå®ãããã¡ãã«æž¡ãããããã§ããã
@Bean @StepScope public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) {
ãšããã§ãµã³ãã«ã³ãŒããèŠããšãJobExplorer
ã¯æå®ããŠããªãã®ã§ããã
ä»åJobParametersIncrementer
ïŒRunIdIncrementer
ïŒã䜿ã£ãã®ã§JobParametersBuilder
ã«JobExplorer
ã®èšå®ããã³ã
JobParametersBuilder#getNextJobParameters`ã®åŒã³åºããå¿
èŠã«ãªããŸãã
ãããRunIdIncrementer
ãªãã§ã§ããéãåãJobParameters
ã§å®è¡ããããšãããšã以äžã®ããã«Job
ã®å®è¡ã®åºŠã«ç°ãªã
JobParamters
ãèšå®ãããããªããšãããå¿
èŠãåºãŠããã§ããããã
@Transformer public JobLaunchRequest toRequest(Message<File> message) { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder .addString("input.file.path", message.getPayload().getAbsolutePath()) .addString("job.start.time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss"))); return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters()); }
ããã§ãSpring Batchããã³Spring Integrationã®èšå®ã¯ã§ããŸããã
Spring Batchã®Job
ããã³Spring Integrationã®IntegrationFlow
ãå®çŸ©ããŠããã¯ã©ã¹å
šäœãèŒãããšããããªæãã«ãªã£ãŠããŸãã
src/main/java/org/littlewings/spring/batch/integration/IntegrationJobConfig.java
package org.littlewings.spring.batch.integration; import java.io.File; import java.nio.file.Paths; import java.time.Duration; import java.time.LocalDateTime; import javax.persistence.EntityManagerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.integration.launch.JobLaunchingGateway; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.database.JpaItemWriter; import org.springframework.batch.item.database.builder.JpaItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.PathResource; import org.springframework.core.io.Resource; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.file.dsl.Files; import org.springframework.integration.file.filters.SimplePatternFileListFilter; import org.springframework.integration.handler.LoggingHandler; @Configuration public class IntegrationJobConfig { @Bean public IntegrationFlow integrationFlow() throws Exception { return IntegrationFlows .from( Files .inboundAdapter(new File("target/files")) .filter(new SimplePatternFileListFilter("*.csv")), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(1L)).maxMessagesPerPoll(1L)) ) .log(LoggingHandler.Level.INFO, "polling file") .transform(fileMessageToJobRequest(null)) .log(LoggingHandler.Level.INFO, "transform job request") .handle(jobLaunchingGateway(null)) .log(LoggingHandler.Level.INFO, "job execution result") .get(); } @Bean public FileMessageToJobRequest fileMessageToJobRequest(JobExplorer jobExplorer) { FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(); fileMessageToJobRequest.setJob(loadFileToDatabaseJob(null)); fileMessageToJobRequest.setJobExplorer(jobExplorer); return fileMessageToJobRequest; } @Bean public JobLaunchingGateway jobLaunchingGateway(JobRepository jobRepository) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository); jobLauncher.setTaskExecutor(new SyncTaskExecutor()); jobLauncher.afterPropertiesSet(); return new JobLaunchingGateway(jobLauncher); } @Bean public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) { return jobBuilderFactory .get("loadFileToDatabaseJob") .incrementer(new RunIdIncrementer()) .start(loadFileToDatabaseStep(null)) .next(deleteFileStep(null)) .build(); } @Bean public Step loadFileToDatabaseStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory .get("loadFileToDatabaseStep") .<Person, Person>chunk(3) .reader(flatFilePersonItemReader(null)) .processor(updateTimeProcessor()) .writer(jpaPersonItemWriter(null)) .build(); } @Bean public Step deleteFileStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory .get("deleteFileStep") .tasklet(deleteFileTasklet(null)) .build(); } @Bean @StepScope public FlatFileItemReader<Person> flatFilePersonItemReader(@Value("#{jobParameters['input.file.path']}") String path) { Resource resource = new PathResource(path); return new FlatFileItemReaderBuilder<Person>() .name("flatFilePersonItemReader") .resource(resource) .encoding("UTF-8") .delimited() .names(new String[]{"id", "lastName", "firstName", "age"}) .linesToSkip(1) .targetType(Person.class) .build(); } @Bean @StepScope public ItemProcessor<Person, Person> updateTimeProcessor() { return (person) -> { person.setUpdatedTime(LocalDateTime.now()); return person; }; } @Bean @StepScope public JpaItemWriter<Person> jpaPersonItemWriter(EntityManagerFactory entityManagerFactory) { return new JpaItemWriterBuilder<Person>() .entityManagerFactory(entityManagerFactory) .build(); } @Bean @StepScope public Tasklet deleteFileTasklet(@Value("#{jobParameters['input.file.path']}") String path) { return (contribution, context) -> { java.nio.file.Files.delete(Paths.get(path)); return RepeatStatus.FINISHED; }; } }
åäœç¢ºèªããŠã¿ã
ã§ã¯ãäœæããã¢ããªã±ãŒã·ã§ã³ãåãããŠã¿ãŸãããã
èªã¿èŸŒãCSVãã¡ã€ã«ãã2ã€çšæããŸãã
src/main/resources/isono_family.csv
id,last_name,first_name,age 1,磯é,ã«ããª,11 2,磯é,ã¯ã«ã¡,9 3,ãã°ç°,ãµã¶ãš,23 4,ãã°ç°,ãã¹ãª,32 5,ãã°ç°,ã¿ã©ãª,3
src/main/resources/namino_family.csv
id,last_name,first_name,age 6,æ³¢é,ããªã¹ã±,24 7,æ³¢é,ã¿ã€ã³,22 8,æ³¢é,ã€ã¯ã©,1
ãªããšãªãsrc/main/resources
ã«çœ®ããŠããã®ã§ã¯ã©ã¹ãã¹äžã«ååšããŠããŸãããã¯ã©ã¹ãã¹ãç£èŠãããããã§ã¯ãããŸããã
ããã±ãŒãžã³ã°ããŠ
$ mvn package
èµ·åã
$ java -jar target/batch-integration-example-0.0.1-SNAPSHOT.jar
Webã¢ããªã±ãŒã·ã§ã³ã§ã¯ãªãã®ã§ããããã®ãŸãŸãµãŒããŒãšããŠèµ·åããŠãããŠããŸãã
ãã®æãsrc/main/resources/application.properties
ã«ä»¥äžã®èšå®ãå
¥ããŠããŸããã
spring.batch.job.enabled=false
ãããå
¥ããŠããªãå Žåã¯ãSpring Batchã®ããã©ã«ãã®åäœãšããŠãã¹ãŠã®Job
ãããããšããã®ã§æ³šæãå¿
èŠã§ãã
ä»åã¯Spring Batch Integrationã䜿ã£ãJob
ãããªãããã€ãã£ã¬ã¯ããªç£èŠã§Job
ãèµ·åãããã®ã§spring.batch.job.enabled
ãfalse
ãšããŠ
ã¢ããªã±ãŒã·ã§ã³èµ·åæã«Job
ãå®è¡ããªãããã«ããŸããã
ãªãããã£ã¬ã¯ããªç£èŠãè¡ããšãç£èŠå¯Ÿè±¡ã®ãã£ã¬ã¯ããªããªãå Žåã¯äœæããããšããã¿ããã§ããã
$ ll target/files åèš 8 drwxrwxr-x 2 xxxxx xxxxx 4096 5æ 20 01:19 ./ drwxrwxr-x 9 xxxxx xxxxx 4096 5æ 20 01:19 ../
æåã«ããŒãã«ã®ç¶æ ã確èªã
mysql> select * from person; Empty set (0.00 sec)
1ãã¡ã€ã«ç®ãç£èŠå¯Ÿè±¡ã®ãã£ã¬ã¯ããªã«ã³ããŒããŠã¿ãŸãã
$ cp src/main/resources/isono_family.csv target/files
ãããšãã¢ããªã±ãŒã·ã§ã³ãåãå§ããŸãã
2022-05-20 01:22:30.249 INFO 18890 --- [ scheduling-1] polling file : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}] 2022-05-20 01:22:30.249 INFO 18890 --- [ scheduling-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one. 2022-05-20 01:22:30.307 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}] 2022-05-20 01:22:30.509 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] 2022-05-20 01:22:30.668 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadFileToDatabaseStep] 2022-05-20 01:22:30.975 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [loadFileToDatabaseStep] executed in 306ms 2022-05-20 01:22:31.061 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [deleteFileStep] 2022-05-20 01:22:31.169 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [deleteFileStep] executed in 108ms 2022-05-20 01:22:31.224 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms 2022-05-20 01:22:31.231 INFO 18890 --- [ scheduling-1] job execution result : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]
ãã°ãèŠããšããã¡ã€ã«ãæ€åºããŠ
2022-05-20 01:22:30.249 INFO 18890 --- [ scheduling-1] polling file : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ddfff35a-5d0c-89ed-905e-f00d2fc00a6c, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350247}]
Job
ã«å€æã
2022-05-20 01:22:30.307 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=37b5abff-b5b8-61ab-e0c1-d73813bf4c2f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977350307}]
Spring Batchã®Job
ãå®è¡ã
2022-05-20 01:22:30.668 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadFileToDatabaseStep] 2022-05-20 01:22:30.975 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [loadFileToDatabaseStep] executed in 306ms 2022-05-20 01:22:31.061 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [deleteFileStep] 2022-05-20 01:22:31.169 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [deleteFileStep] executed in 108ms 2022-05-20 01:22:31.224 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 668ms
çµäºããšããæ§åãããããŸãã
2022-05-20 01:22:31.231 INFO 18890 --- [ scheduling-1] job execution result : GenericMessage [payload=JobExecution: id=1, version=2, startTime=Fri May 20 01:22:30 JST 2022, endTime=Fri May 20 01:22:31 JST 2022, lastUpdated=Fri May 20 01:22:31 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=1, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=82978876-8ca8-12d7-4fe6-9faeaf0312ec, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652977351224}]
ããŒã¿ã確èªããŠã¿ãŸãããã
mysql> select * from person; +----+-----------+------------+------+---------------------+ | id | last_name | first_name | age | updated_time | +----+-----------+------------+------+---------------------+ | 1 | 磯é | ã«ã㪠| 11 | 2022-05-20 01:22:31 | | 2 | 磯é | ã¯ã«ã¡ | 9 | 2022-05-20 01:22:31 | | 3 | ãã°ç° | ãµã¶ãš | 23 | 2022-05-20 01:22:31 | | 4 | ãã°ç° | ãã¹ãª | 32 | 2022-05-20 01:22:31 | | 5 | ãã°ç° | ã¿ã©ãª | 3 | 2022-05-20 01:22:31 | +----+-----------+------------+------+---------------------+ 5 rows in set (0.00 sec)
ã¡ãããšåã蟌ãŸããŠããŸããã
ç£èŠå¯Ÿè±¡ã®ãã£ã¬ã¯ããªã«é
眮ãããã¡ã€ã«ã¯ãSpring Batchã®Job
ã«å«ãŸããŠããTasklet
ã«ããåé€ãããŠããŸãã
$ tree target/files target/files 0 directories, 0 files
ããã²ãšã€ã®ãã¡ã€ã«ããç£èŠå¯Ÿè±¡ã®ãã£ã¬ã¯ããªã«ã³ããŒããŠã¿ãŸãã
$ cp src/main/resources/namino_family.csv target/files
å ã»ã©ãšåæ§ãé 眮ããããã¡ã€ã«ãæ€åºããŠSpring Batchãèµ·åããŸããã
2022-05-20 01:31:28.237 INFO 18890 --- [ scheduling-1] polling file : GenericMessage [payload=target/files/namino_family.csv, headers={file_originalFile=target/files/namino_family.csv, id=2e9cac33-a5be-8eeb-a471-d47f579906cc, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888237}] 2022-05-20 01:31:28.278 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}, headers={file_originalFile=target/files/namino_family.csv, id=c404ecd5-4b5a-2593-dc12-13b0621b3b54, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888278}] 2022-05-20 01:31:28.365 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}] 2022-05-20 01:31:28.448 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadFileToDatabaseStep] 2022-05-20 01:31:28.581 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [loadFileToDatabaseStep] executed in 132ms 2022-05-20 01:31:28.658 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [deleteFileStep] 2022-05-20 01:31:28.834 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [deleteFileStep] executed in 176ms 2022-05-20 01:31:28.939 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}] and the following status: [COMPLETED] in 538ms 2022-05-20 01:31:28.940 INFO 18890 --- [ scheduling-1] job execution result : GenericMessage [payload=JobExecution: id=2, version=2, startTime=Fri May 20 01:31:28 JST 2022, endTime=Fri May 20 01:31:28 JST 2022, lastUpdated=Fri May 20 01:31:28 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=2, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=2, input.file.path=/path/to/batch-integration-example/target/files/namino_family.csv}], headers={file_originalFile=target/files/namino_family.csv, id=f50295b0-dd2b-5d49-6196-d61c5507b3d5, file_name=namino_family.csv, file_relativePath=namino_family.csv, timestamp=1652977888940}]
åã蟌ãŸããããŒã¿ã
mysql> select * from person; +----+-----------+--------------+------+---------------------+ | id | last_name | first_name | age | updated_time | +----+-----------+--------------+------+---------------------+ | 1 | 磯é | ã«ã㪠| 11 | 2022-05-20 01:22:31 | | 2 | 磯é | ã¯ã«ã¡ | 9 | 2022-05-20 01:22:31 | | 3 | ãã°ç° | ãµã¶ãš | 23 | 2022-05-20 01:22:31 | | 4 | ãã°ç° | ãã¹ãª | 32 | 2022-05-20 01:22:31 | | 5 | ãã°ç° | ã¿ã©ãª | 3 | 2022-05-20 01:22:31 | | 6 | æ³¢é | ããªã¹ã± | 24 | 2022-05-20 01:31:29 | | 7 | æ³¢é | ã¿ã€ã³ | 22 | 2022-05-20 01:31:29 | | 8 | æ³¢é | ã€ã¯ã© | 1 | 2022-05-20 01:31:29 | +----+-----------+--------------+------+---------------------+ 8 rows in set (0.00 sec)
ããã§ããã1床æåã«åã蟌ãã ãã¡ã€ã«ãã³ããŒããŠã¿ãŸãã
$ cp src/main/resources/isono_family.csv target/files
æåãšåãããã«ãSpring Batchãèµ·åããŸãã
2022-05-20 01:33:40.237 INFO 18890 --- [ scheduling-1] polling file : GenericMessage [payload=target/files/isono_family.csv, headers={file_originalFile=target/files/isono_family.csv, id=ef090587-8d4d-0932-7b89-62d2418d8391, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020237}] 2022-05-20 01:33:40.252 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}] 2022-05-20 01:33:40.320 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] launched with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] 2022-05-20 01:33:40.397 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadFileToDatabaseStep] 2022-05-20 01:33:40.504 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [loadFileToDatabaseStep] executed in 107ms 2022-05-20 01:33:40.571 INFO 18890 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [deleteFileStep] 2022-05-20 01:33:40.631 INFO 18890 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [deleteFileStep] executed in 60ms 2022-05-20 01:33:40.691 INFO 18890 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=loadFileToDatabaseJob]] completed with the following parameters: [{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}] and the following status: [COMPLETED] in 348ms 2022-05-20 01:33:40.692 INFO 18890 --- [ scheduling-1] job execution result : GenericMessage [payload=JobExecution: id=3, version=2, startTime=Fri May 20 01:33:40 JST 2022, endTime=Fri May 20 01:33:40 JST 2022, lastUpdated=Fri May 20 01:33:40 JST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=3, version=0, Job=[loadFileToDatabaseJob]], jobParameters=[{run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}], headers={file_originalFile=target/files/isono_family.csv, id=23cb8db4-88cd-5bfb-6557-aca9275a8b1f, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020691}]
Job
ã®æ§æã«RunIdIncrementer
ãå«ããŠããã®ã§ãrun.id
ãã€ã³ã¯ãªã¡ã³ããããŠããã®ã§äžãããã¡ã€ã«ãã¹ãåãã§ãåé¡ãªã
èµ·åããŸãã
2022-05-20 01:33:40.252 INFO 18890 --- [ scheduling-1] transform job request : GenericMessage [payload=JobLaunchRequest: loadFileToDatabaseJob, parameters={run.id=3, input.file.path=/path/to/batch-integration-example/target/files/isono_family.csv}, headers={file_originalFile=target/files/isono_family.csv, id=f2603171-64a4-9093-0621-0125698264db, file_name=isono_family.csv, file_relativePath=isono_family.csv, timestamp=1652978020252}]
RunIdIncrementer
ãå«ããã«åãJobParameters
ã§Job
ãèµ·åããå Žåã¯ãSpring Batchã®å®è¡ã«å€±æããããšã«ãªããŸãã
åãããŒã¿ã®updated_time
ãæŽæ°ãããŠããããšã確èªããŠãããŸãã
mysql> select * from person; +----+-----------+--------------+------+---------------------+ | id | last_name | first_name | age | updated_time | +----+-----------+--------------+------+---------------------+ | 1 | 磯é | ã«ã㪠| 11 | 2022-05-20 01:33:40 | | 2 | 磯é | ã¯ã«ã¡ | 9 | 2022-05-20 01:33:40 | | 3 | ãã°ç° | ãµã¶ãš | 23 | 2022-05-20 01:33:40 | | 4 | ãã°ç° | ãã¹ãª | 32 | 2022-05-20 01:33:40 | | 5 | ãã°ç° | ã¿ã©ãª | 3 | 2022-05-20 01:33:40 | | 6 | æ³¢é | ããªã¹ã± | 24 | 2022-05-20 01:31:29 | | 7 | æ³¢é | ã¿ã€ã³ | 22 | 2022-05-20 01:31:29 | | 8 | æ³¢é | ã€ã¯ã© | 1 | 2022-05-20 01:31:29 | +----+-----------+--------------+------+---------------------+ 8 rows in set (0.00 sec)
OKã§ããã
ããã§ãSpring BatchãšSpring Integrationã®çµã¿åããã§åäœç¢ºèªã§ããŸããã
ãªãã±ïŒRunIdIncrementerã®ä»£ããã«JsrJobParametersConverterã䜿ã£ãå Žåã¯ïŒ
ä»åãRunIdIncrementer
ã䜿çšããŸãããJsrJobParametersConverter
ã䜿ã£ãŠãããã®ã§ã¯ïŒãšããçåãæã£ããã¯ããŸãã
RunIdIncrementer
ãã³ã¡ã³ãã¢ãŠãããŠ
@Bean public Job loadFileToDatabaseJob(JobBuilderFactory jobBuilderFactory) { return jobBuilderFactory .get("loadFileToDatabaseJob") // .incrementer(new RunIdIncrementer()) .start(loadFileToDatabaseStep(null)) .next(deleteFileStep(null)) .build(); }
JsrJobParametersConverter
ãBeanãšããŠå®çŸ©ããŠãããã®ã§ã¯ãªãã®ã§ããããã
@Bean public JsrJobParametersConverter jsrJobParametersConverter(DataSource dataSource) { return new JsrJobParametersConverter(dataSource); }
ããã¯ãããŸããããŸããã
ãã®å®çŸ©æ¹æ³ã§JsrJobParametersConverter
ã䜿ãããã®ã¯ãSpring BatchãJobLauncherApplicationRunner
ã䜿ã£ãŠèµ·åããå Žåã§ããã
ãããã¯ãJsrJobOperator
ã䜿ã£ãŠãããããã§ã¯ãããŸãã
Spring Batch Integrationãä»ããŠå®è¡ã§ããªãããã§ã¯ãããŸããâŠã
ãŸãšã
Spring BatchãšSpring Integrationãçµã¿åããããSpring Batch Integrationãè©ŠããŠã¿ãŸããã
ããã¥ã¡ã³ããå°ãªãã®ã§Spring BatchãšSpring Integrationã®ã€ãªã蟌ã¿ã¯ããèŠåŽããŸãããããããã¯ãªã¢ããã°Spring Batchãš
Spring Integrationããããã«ç¥èãããã°ç°¡åã«äœ¿ããæããããŸãããã
Spring Integrationã®Inbound Channel Adapterãä»ããŠååŸããMessage
ã®åäœãSpring Batchã®Job
ã®èµ·ååäœã«ãªãã®ã§ã
ãã®ç²åºŠã§å€§äžå€«ãªãSpring Integrationã®æ©èœã䜿ããã®ã§Spring Batchã䜿ã£ãè¡šçŸç¯å²ã倧ãããªãã®ã¯ããããããŸããã
èŠããŠãããšããããã§ããã