jBatchで、Chunk方式とBatchletを試してみたので、これらをつなげて遊んでみようかなと。また、間にDecisionも入れてみようかと。
こういうテーマで、プログラムを書いてみようと思います。
- Chunk方式(ItemReader/ItemProcessor/ItemWriter)で、CSVを作る(CSVデータの保存先は、ファイルとメモリの2パターンを作る)
- ItemWriterでStepContext#setExitStatusを設定する
- ItemWriterでの設定結果を参照し、Deciderで後続の処理を行うかどうか判定する
- 後続の処理を行う場合、CSVファイルをgzip圧縮して保存する
- 後続の処理を行わない場合、異常終了とする
途中のデータの受け渡しなどは、プロパティやJobContextのTransientUserDataで行おうかと。
では、やってみます。jBatchの実装は、JBeretとします。
準備
ビルド定義から。
build.sbt
name := "jbatch-chunked-batchlet-decision" version := "0.0.1-SNAPSHOT" organization := "org.littlewings" scalaVersion := "2.11.8" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) libraryDependencies ++= Seq( // jBatch "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final", "org.jberet" % "jberet-se" % "1.2.0.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime", "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime", "org.jboss" % "jandex" % "2.0.2.Final" % "runtime", "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime", "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime", // Logging "org.jboss.logging" % "jboss-logging" % "3.3.0.Final", // Test "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
お決まりとして、CDIの設定も用意。
src/main/resources/META-INF/beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" bean-discovery-mode="annotated"> </beans>
Job XMLの作成
先に、Job XMLを載せておきます。冒頭に書いたとおり、今回は2種類のジョブを作成します。
まずは、ジョブのプロパティでステップ間のやり取りをするパターン。
src/main/resources/META-INF/batch-jobs/properties-job.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="properties-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <properties> <property name="outputCsvFilePath" value="#{jobParameters['csvFilePath']}"/> <property name="outputArchiveCsvFilePath" value="#{jobParameters['archiveCsvFilePath']}"/> </properties> <step id="write-csv" next="decision"> <properties> <property name="writeStepStatus" value="#{jobParameters['writeStepStatus']}"/> </properties> <chunk item-count="4"> <reader ref="languagesItemReader"/> <processor ref="languagesItemProcessor"/> <writer ref="propertiesCsvItemWriter"/> </chunk> </step> <decision id="decision" ref="csvDecider"> <next on="DECISION-SUCCESS" to="archive"/> <fail on="*" exit-status="DECISION-FAILED-END"/> </decision> <step id="archive"> <batchlet ref="propertiesCsvArchiveBatchlet"/> </step> </job>
ChunkとBatchletの間にDecisionが配置され、作成したDeciderが「DECISION-SUCCESS」を返すとBatchletへ進み、そうでなければ失敗するようにしています。
<decision id="decision" ref="csvDecider"> <next on="DECISION-SUCCESS" to="archive"/> <fail on="*" exit-status="DECISION-FAILED-END"/> </decision>
あとは、プロパティとしてCSVファイルおよびgzipファイルのパス、そしてChunkの部分にはStepContext#setExitStatusをプロパティとして外部から受け取れるように定義しています。
もうひとつは、作成したCSVをメモリ中(JobContext#setTransientUserData)で受け渡しするパターン。先ほどのJob XMLとはCSVファイルのパスがプロパティからなくなり、ItemWriterとBatchletのクラスを変えています。
src/main/resources/META-INF/batch-jobs/user-data-job.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="properties-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <properties> <property name="outputArchiveCsvFilePath" value="#{jobParameters['archiveCsvFilePath']}"/> </properties> <step id="write-csv" next="decision"> <properties> <property name="writeStepStatus" value="#{jobParameters['writeStepStatus']}"/> </properties> <chunk item-count="4"> <reader ref="languagesItemReader"/> <processor ref="languagesItemProcessor"/> <writer ref="userDataCsvItemWriter"/> </chunk> </step> <decision id="decision" ref="csvDecider"> <next on="DECISION-SUCCESS" to="archive"/> <fail on="*" exit-status="DECISION-FAILED-END"/> </decision> <step id="archive"> <batchlet ref="userDataCsvArchiveBatchlet"/> </step> </job>
それでは、実装の方へ。
Chunk側の実装
ItemReader。プログラミング言語10個を、順々に返すだけ。
src/main/scala/org/littlewings/javaee7/batch/LanguagesItemReader.scala
package org.littlewings.javaee7.batch import javax.batch.api.chunk.AbstractItemReader import javax.enterprise.context.Dependent import javax.inject.Named @Dependent @Named class LanguagesItemReader extends AbstractItemReader { private val languages: Iterator[String] = List("Java", "Scala", "Groovy", "Clojure", "Kotlin", "Perl", "Ruby", "Python", "PHP", "C").iterator override def readItem(): AnyRef = if (languages.hasNext) languages.next else null }
ItemProcessor。もらったものを、そのまま返します。
src/main/scala/org/littlewings/javaee7/batch/LanguagesItemProcessor.scala
package org.littlewings.javaee7.batch import javax.batch.api.chunk.ItemProcessor import javax.enterprise.context.Dependent import javax.inject.Named @Dependent @Named class LanguagesItemProcessor extends ItemProcessor { override def processItem(item: AnyRef): AnyRef = item }
ItemWriterは、2つ用意。ファイルとしてCSVを書き出すItemWriter。
src/main/scala/org/littlewings/javaee7/batch/PropertiesCsvItemWriter.scala
package org.littlewings.javaee7.batch import java.io.BufferedWriter import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import javax.batch.api.chunk.AbstractItemWriter import javax.batch.runtime.context.{JobContext, StepContext} import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import scala.collection.JavaConverters._ @Dependent @Named class PropertiesCsvItemWriter extends AbstractItemWriter { @Inject private var jobContext: JobContext = _ @Inject private var stepContext: StepContext = _ private var writer: BufferedWriter = _ override def open(checkpoint: java.io.Serializable): Unit = { writer = Files.newBufferedWriter(Paths.get(jobContext.getProperties.getProperty("outputCsvFilePath")), StandardCharsets.UTF_8 ) } override def writeItems(items: java.util.List[AnyRef]): Unit = { items .asScala .foreach { case item: String => writer.write(item) writer.newLine() } stepContext.setExitStatus(stepContext.getProperties.getProperty("writeStepStatus")) } override def close(): Unit = writer.close() }
StepContextのExitStatusとして、プロパティで受け取った内容を設定するようにしています。
stepContext.setExitStatus(stepContext.getProperties.getProperty("writeStepStatus"))
次に、CSV用のデータをJobContextのTransientUserDataとして持つ方。
src/main/scala/org/littlewings/javaee7/batch/UserDataCsvItemWriter.scala
package org.littlewings.javaee7.batch import javax.batch.api.chunk.AbstractItemWriter import javax.batch.runtime.context.{StepContext, JobContext} import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @Dependent @Named class UserDataCsvItemWriter extends AbstractItemWriter { @Inject private var jobContext: JobContext = _ @Inject private var stepContext: StepContext = _ private val buffer: mutable.ArrayBuffer[String] = new ArrayBuffer[String] override def writeItems(items: java.util.List[AnyRef]): Unit = { items .asScala .foreach { case item: String => buffer += item } stepContext.setExitStatus(stepContext.getProperties.getProperty("writeStepStatus")) } override def close(): Unit = jobContext.setTransientUserData(buffer) }
StepContextのExitStatusを設定するところは同じです。
Decider
ChunkからBatchletに、進むかどうかを判定するためのDecider。
src/main/scala/org/littlewings/javaee7/batch/CsvDecider.scala
package org.littlewings.javaee7.batch import javax.batch.api.Decider import javax.batch.runtime.StepExecution import javax.enterprise.context.Dependent import javax.inject.Named @Dependent @Named class CsvDecider extends Decider { override def decide(executions: Array[StepExecution]): String = { executions.foreach(_.getExitStatus) if (executions.filterNot(_.getExitStatus == "SUCCESS").isEmpty) "DECISION-SUCCESS" else "DECISION-ERROR" } }
ここでは、前のChunkでのStepContext#setExitStatusで、「SUCCESS」と設定されていれば次へ進むようになります。
Job XMLのこの部分が効くことになります。
<decision id="decision" ref="csvDecider"> <next on="DECISION-SUCCESS" to="archive"/> <fail on="*" exit-status="DECISION-FAILED-END"/> </decision>
Batchlet
最後は、Batchlet。こちらも2つ用意ですが、Chunk側で作成したCSVデータをgzipしてファイル出力します。
CSVファイルを読み込んで、gzip圧縮するBatchlet。
src/main/scala/org/littlewings/javaee7/batch/PropertiesCsvArchiveBatchlet.scala
package org.littlewings.javaee7.batch import java.io.{BufferedInputStream, BufferedOutputStream} import java.nio.file.{Files, Paths} import java.util.zip.GZIPOutputStream import javax.batch.api.AbstractBatchlet import javax.batch.runtime.context.JobContext import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} @Dependent @Named class PropertiesCsvArchiveBatchlet extends AbstractBatchlet { @Inject private var jobContext: JobContext = _ override def process(): String = { val outputCsvFilePath = Paths.get(jobContext.getProperties.getProperty("outputCsvFilePath")) val outputArchiveCsvFilePath = Paths.get(jobContext.getProperties.getProperty("outputArchiveCsvFilePath")) val is = new BufferedInputStream(Files.newInputStream(outputCsvFilePath)) val os = new GZIPOutputStream(new BufferedOutputStream(Files.newOutputStream(outputArchiveCsvFilePath))) try { Iterator .continually(is.read()) .takeWhile(_ > -1) .foreach(os.write) os.finish() } finally { is.close() os.close() } jobContext.setExitStatus("BATCHLET-SUCCESS") "COMPLETED" } }
JobContextのTransientUserDataで受け取ったデータを、CSVファイルかつgzipして出力するBatchlet。
src/main/scala/org/littlewings/javaee7/batch/UserDataCsvArchiveBatchlet.scala
package org.littlewings.javaee7.batch import java.io.BufferedOutputStream import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import java.util.zip.GZIPOutputStream import javax.batch.api.AbstractBatchlet import javax.batch.runtime.context.JobContext import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import scala.collection.mutable.ArrayBuffer @Dependent @Named class UserDataCsvArchiveBatchlet extends AbstractBatchlet { @Inject private var jobContext: JobContext = _ override def process(): String = { val contentsBinary = jobContext .getTransientUserData .asInstanceOf[ArrayBuffer[String]] .mkString(System.lineSeparator) .getBytes(StandardCharsets.UTF_8) val outputArchiveCsvFilePath = Paths.get(jobContext.getProperties.getProperty("outputArchiveCsvFilePath")) val os = new GZIPOutputStream(new BufferedOutputStream(Files.newOutputStream(outputArchiveCsvFilePath))) try { os.write(contentsBinary) os.finish() } finally { os.close() } jobContext.setExitStatus("BATCHLET-SUCCESS") "COMPLETE" } }
どちらのBathletも、JobContextのExitStatusには「BATCHLET-SUCCESS」と設定するようにしました。
テスト
それでは、テストコードを書いて動かしてみます。テストコードは、ScalaTestで書いています。
src/test/scala/org/littlewings/javaee7/batch/ChunkedBatchletSpec.scala
package org.littlewings.javaee7.batch import java.nio.file.{Files, Paths} import java.time.LocalDateTime import java.time.format.DateTimeFormatter import java.util.concurrent.TimeUnit import java.util.zip.GZIPInputStream import javax.batch.runtime.{BatchRuntime, BatchStatus} import org.jberet.runtime.JobExecutionImpl import org.scalatest.{FunSpec, Matchers} import scala.io.Source class ChunkedBatchletSpec extends FunSpec with Matchers { describe("Chunked - Batchlet Spec") { val jobOperator = BatchRuntime.getJobOperator // ここに、テストを書く! } }
最初にこのコードがあるのですが
val jobOperator = BatchRuntime.getJobOperator
こちらを複数回呼び出すと、Weldがエラーになるので1回の記述にしました。Java SE環境だからでしょうねぇ…。
では、テストの方を。
CSVファイル出力、パスはプロパティで設定するパターン。
it("using Properties, Success") { val jobId = "properties-job" val now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")) val outputCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv") val outputArchiveCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv.gz") val properties = new java.util.Properties properties.put("csvFilePath", outputCsvFilePath.toString) properties.put("archiveCsvFilePath", outputArchiveCsvFilePath.toString) properties.put("writeStepStatus", "SUCCESS") val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.COMPLETED) jobExecution.getExitStatus should be("BATCHLET-SUCCESS") outputCsvFilePath.toFile should exist outputArchiveCsvFilePath.toFile should exist val source = Source.fromInputStream(new GZIPInputStream(Files.newInputStream(outputArchiveCsvFilePath)), "UTF-8") source.getLines.mkString(",") should be("Java,Scala,Groovy,Clojure,Kotlin,Perl,Ruby,Python,PHP,C") source.close() Files.delete(outputCsvFilePath) Files.delete(outputArchiveCsvFilePath) }
その失敗パターン。StepContextのExitStatusに、エラーとなるように文字列設定しています。
it("using Properties, Fail") { val jobId = "properties-job" val now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")) val outputCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv") val outputArchiveCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv.gz") val properties = new java.util.Properties properties.put("csvFilePath", outputCsvFilePath.toString) properties.put("archiveCsvFilePath", outputArchiveCsvFilePath.toString) properties.put("writeStepStatus", "PARAMETER-STATUS-FAIL") val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.FAILED) jobExecution.getExitStatus should be("DECISION-FAILED-END") outputCsvFilePath.toFile should exist outputArchiveCsvFilePath.toFile should not be (exist) Files.delete(outputCsvFilePath) }
gzipファイルは作成されません。
JobContextのTransientUserDataで、データを受け渡すパターン。
it("using UserData, Success") { val jobId = "user-data-job" val now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")) val outputArchiveCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv.gz") val properties = new java.util.Properties properties.put("archiveCsvFilePath", outputArchiveCsvFilePath.toString) properties.put("writeStepStatus", "SUCCESS") val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.COMPLETED) jobExecution.getExitStatus should be("BATCHLET-SUCCESS") outputArchiveCsvFilePath.toFile should exist val source = Source.fromInputStream(new GZIPInputStream(Files.newInputStream(outputArchiveCsvFilePath)), "UTF-8") source.getLines.mkString(",") should be("Java,Scala,Groovy,Clojure,Kotlin,Perl,Ruby,Python,PHP,C") source.close() Files.delete(outputArchiveCsvFilePath) }
その失敗パターン。
it("using UserData, Fail") { val jobId = "user-data-job" val now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")) val outputArchiveCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv.gz") val properties = new java.util.Properties properties.put("archiveCsvFilePath", outputArchiveCsvFilePath.toString) properties.put("writeStepStatus", "PARAMETER-STATUS-FAIL") val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.FAILED) jobExecution.getExitStatus should be("DECISION-FAILED-END") outputArchiveCsvFilePath.toFile should not be (exist) }
とりあえず、なんとかつながったようです。
まとめ
Chunk、Decision、Batchletをつなげて、小さいバッチを作ってみました。
最初、各ContextのUserDataを覚えてみようかなと始めてみたのですが、若干方針転換してDecisionまで持ち出す形になりました…。まあ、今回はじめて触った各要素の勉強にはなったので、よいかなと。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-chunked-batchlet-decision