CLOVER🍀

That was when it all began.

jBatchで、Chunk、Decision、Batchletをつなげて遊んでみる

jBatchで、Chunk方式とBatchletを試してみたので、これらをつなげて遊んでみようかなと。また、間にDecisionも入れてみようかと。

こういうテーマで、プログラムを書いてみようと思います。

  • Chunk方式(ItemReader/ItemProcessor/ItemWriter)で、CSVを作る(CSVデータの保存先は、ファイルとメモリの2パターンを作る)
  • ItemWriterでStepContext#setExitStatusを設定する
  • ItemWriterでの設定結果を参照し、Deciderで後続の処理を行うかどうか判定する
  • 後続の処理を行う場合、CSVファイルをgzip圧縮して保存する
  • 後続の処理を行わない場合、異常終了とする

途中のデータの受け渡しなどは、プロパティやJobContextのTransientUserDataで行おうかと。

では、やってみます。jBatchの実装は、JBeretとします。

準備

ビルド定義から。
build.sbt

name := "jbatch-chunked-batchlet-decision"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  // jBatch
  "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final",
  "org.jberet" % "jberet-se" % "1.2.0.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime",
  "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime",
  "org.jboss" % "jandex" % "2.0.2.Final" % "runtime",
  "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime",

  // Logging
  "org.jboss.logging" % "jboss-logging" % "3.3.0.Final",

  // Test
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

お決まりとして、CDIの設定も用意。
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                           http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="annotated">
</beans>

Job XMLの作成

先に、Job XMLを載せておきます。冒頭に書いたとおり、今回は2種類のジョブを作成します。

まずは、ジョブのプロパティでステップ間のやり取りをするパターン。
src/main/resources/META-INF/batch-jobs/properties-job.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="properties-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <properties>
        <property name="outputCsvFilePath" value="#{jobParameters['csvFilePath']}"/>
        <property name="outputArchiveCsvFilePath" value="#{jobParameters['archiveCsvFilePath']}"/>
    </properties>

    <step id="write-csv" next="decision">
        <properties>
            <property name="writeStepStatus" value="#{jobParameters['writeStepStatus']}"/>
        </properties>
        <chunk item-count="4">
            <reader ref="languagesItemReader"/>
            <processor ref="languagesItemProcessor"/>
            <writer ref="propertiesCsvItemWriter"/>
        </chunk>
    </step>

    <decision id="decision" ref="csvDecider">
        <next on="DECISION-SUCCESS" to="archive"/>
        <fail on="*" exit-status="DECISION-FAILED-END"/>
    </decision>

    <step id="archive">
        <batchlet ref="propertiesCsvArchiveBatchlet"/>
    </step>
</job>

ChunkとBatchletの間にDecisionが配置され、作成したDeciderが「DECISION-SUCCESS」を返すとBatchletへ進み、そうでなければ失敗するようにしています。

    <decision id="decision" ref="csvDecider">
        <next on="DECISION-SUCCESS" to="archive"/>
        <fail on="*" exit-status="DECISION-FAILED-END"/>
    </decision>

あとは、プロパティとしてCSVファイルおよびgzipファイルのパス、そしてChunkの部分にはStepContext#setExitStatusをプロパティとして外部から受け取れるように定義しています。

もうひとつは、作成したCSVをメモリ中(JobContext#setTransientUserData)で受け渡しするパターン。先ほどのJob XMLとはCSVファイルのパスがプロパティからなくなり、ItemWriterとBatchletのクラスを変えています。
src/main/resources/META-INF/batch-jobs/user-data-job.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="properties-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <properties>
        <property name="outputArchiveCsvFilePath" value="#{jobParameters['archiveCsvFilePath']}"/>
    </properties>

    <step id="write-csv" next="decision">
        <properties>
            <property name="writeStepStatus" value="#{jobParameters['writeStepStatus']}"/>
        </properties>
        <chunk item-count="4">
            <reader ref="languagesItemReader"/>
            <processor ref="languagesItemProcessor"/>
            <writer ref="userDataCsvItemWriter"/>
        </chunk>
    </step>

    <decision id="decision" ref="csvDecider">
        <next on="DECISION-SUCCESS" to="archive"/>
        <fail on="*" exit-status="DECISION-FAILED-END"/>
    </decision>

    <step id="archive">
        <batchlet ref="userDataCsvArchiveBatchlet"/>
    </step>
</job>

それでは、実装の方へ。

Chunk側の実装

ItemReader。プログラミング言語10個を、順々に返すだけ。
src/main/scala/org/littlewings/javaee7/batch/LanguagesItemReader.scala

package org.littlewings.javaee7.batch

import javax.batch.api.chunk.AbstractItemReader
import javax.enterprise.context.Dependent
import javax.inject.Named

@Dependent
@Named
class LanguagesItemReader extends AbstractItemReader {
  private val languages: Iterator[String] =
    List("Java", "Scala", "Groovy", "Clojure", "Kotlin", "Perl", "Ruby", "Python", "PHP", "C").iterator

  override def readItem(): AnyRef =
    if (languages.hasNext) languages.next
    else null
}

ItemProcessor。もらったものを、そのまま返します。
src/main/scala/org/littlewings/javaee7/batch/LanguagesItemProcessor.scala

package org.littlewings.javaee7.batch

import javax.batch.api.chunk.ItemProcessor
import javax.enterprise.context.Dependent
import javax.inject.Named

@Dependent
@Named
class LanguagesItemProcessor extends ItemProcessor {
  override def processItem(item: AnyRef): AnyRef = item
}

ItemWriterは、2つ用意。ファイルとしてCSVを書き出すItemWriter。
src/main/scala/org/littlewings/javaee7/batch/PropertiesCsvItemWriter.scala

package org.littlewings.javaee7.batch

import java.io.BufferedWriter
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import javax.batch.api.chunk.AbstractItemWriter
import javax.batch.runtime.context.{JobContext, StepContext}
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import scala.collection.JavaConverters._

@Dependent
@Named
class PropertiesCsvItemWriter extends AbstractItemWriter {
  @Inject
  private var jobContext: JobContext = _

  @Inject
  private var stepContext: StepContext = _

  private var writer: BufferedWriter = _

  override def open(checkpoint: java.io.Serializable): Unit = {
    writer =
      Files.newBufferedWriter(Paths.get(jobContext.getProperties.getProperty("outputCsvFilePath")),
        StandardCharsets.UTF_8
      )
  }

  override def writeItems(items: java.util.List[AnyRef]): Unit = {
    items
      .asScala
      .foreach { case item: String =>
        writer.write(item)
        writer.newLine()
      }

    stepContext.setExitStatus(stepContext.getProperties.getProperty("writeStepStatus"))
  }

  override def close(): Unit = writer.close()
}

StepContextのExitStatusとして、プロパティで受け取った内容を設定するようにしています。

    stepContext.setExitStatus(stepContext.getProperties.getProperty("writeStepStatus"))

次に、CSV用のデータをJobContextのTransientUserDataとして持つ方。
src/main/scala/org/littlewings/javaee7/batch/UserDataCsvItemWriter.scala

package org.littlewings.javaee7.batch

import javax.batch.api.chunk.AbstractItemWriter
import javax.batch.runtime.context.{StepContext, JobContext}
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

@Dependent
@Named
class UserDataCsvItemWriter extends AbstractItemWriter {
  @Inject
  private var jobContext: JobContext = _

  @Inject
  private var stepContext: StepContext = _

  private val buffer: mutable.ArrayBuffer[String] = new ArrayBuffer[String]

  override def writeItems(items: java.util.List[AnyRef]): Unit = {
    items
      .asScala
      .foreach { case item: String => buffer += item }

    stepContext.setExitStatus(stepContext.getProperties.getProperty("writeStepStatus"))
  }

  override def close(): Unit = jobContext.setTransientUserData(buffer)
}

StepContextのExitStatusを設定するところは同じです。

Decider

ChunkからBatchletに、進むかどうかを判定するためのDecider。
src/main/scala/org/littlewings/javaee7/batch/CsvDecider.scala

package org.littlewings.javaee7.batch

import javax.batch.api.Decider
import javax.batch.runtime.StepExecution
import javax.enterprise.context.Dependent
import javax.inject.Named

@Dependent
@Named
class CsvDecider extends Decider {
  override def decide(executions: Array[StepExecution]): String = {
    executions.foreach(_.getExitStatus)
    if (executions.filterNot(_.getExitStatus == "SUCCESS").isEmpty) "DECISION-SUCCESS"
    else "DECISION-ERROR"
  }
}

ここでは、前のChunkでのStepContext#setExitStatusで、「SUCCESS」と設定されていれば次へ進むようになります。

Job XMLのこの部分が効くことになります。

    <decision id="decision" ref="csvDecider">
        <next on="DECISION-SUCCESS" to="archive"/>
        <fail on="*" exit-status="DECISION-FAILED-END"/>
    </decision>

Batchlet

最後は、Batchlet。こちらも2つ用意ですが、Chunk側で作成したCSVデータをgzipしてファイル出力します。

CSVファイルを読み込んで、gzip圧縮するBatchlet。
src/main/scala/org/littlewings/javaee7/batch/PropertiesCsvArchiveBatchlet.scala

package org.littlewings.javaee7.batch

import java.io.{BufferedInputStream, BufferedOutputStream}
import java.nio.file.{Files, Paths}
import java.util.zip.GZIPOutputStream
import javax.batch.api.AbstractBatchlet
import javax.batch.runtime.context.JobContext
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

@Dependent
@Named
class PropertiesCsvArchiveBatchlet extends AbstractBatchlet {
  @Inject
  private var jobContext: JobContext = _

  override def process(): String = {
    val outputCsvFilePath = Paths.get(jobContext.getProperties.getProperty("outputCsvFilePath"))
    val outputArchiveCsvFilePath = Paths.get(jobContext.getProperties.getProperty("outputArchiveCsvFilePath"))

    val is = new BufferedInputStream(Files.newInputStream(outputCsvFilePath))
    val os = new GZIPOutputStream(new BufferedOutputStream(Files.newOutputStream(outputArchiveCsvFilePath)))

    try {
      Iterator
        .continually(is.read())
        .takeWhile(_ > -1)
        .foreach(os.write)

      os.finish()
    } finally {
      is.close()
      os.close()
    }

    jobContext.setExitStatus("BATCHLET-SUCCESS")

    "COMPLETED"
  }
}

JobContextのTransientUserDataで受け取ったデータを、CSVファイルかつgzipして出力するBatchlet。
src/main/scala/org/littlewings/javaee7/batch/UserDataCsvArchiveBatchlet.scala

package org.littlewings.javaee7.batch

import java.io.BufferedOutputStream
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util.zip.GZIPOutputStream
import javax.batch.api.AbstractBatchlet
import javax.batch.runtime.context.JobContext
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import scala.collection.mutable.ArrayBuffer

@Dependent
@Named
class UserDataCsvArchiveBatchlet extends AbstractBatchlet {
  @Inject
  private var jobContext: JobContext = _

  override def process(): String = {
    val contentsBinary =
      jobContext
        .getTransientUserData
        .asInstanceOf[ArrayBuffer[String]]
        .mkString(System.lineSeparator)
        .getBytes(StandardCharsets.UTF_8)

    val outputArchiveCsvFilePath = Paths.get(jobContext.getProperties.getProperty("outputArchiveCsvFilePath"))
    val os = new GZIPOutputStream(new BufferedOutputStream(Files.newOutputStream(outputArchiveCsvFilePath)))

    try {
      os.write(contentsBinary)
      os.finish()
    } finally {
      os.close()
    }

    jobContext.setExitStatus("BATCHLET-SUCCESS")

    "COMPLETE"
  }
}

どちらのBathletも、JobContextのExitStatusには「BATCHLET-SUCCESS」と設定するようにしました。

テスト

それでは、テストコードを書いて動かしてみます。テストコードは、ScalaTestで書いています。
src/test/scala/org/littlewings/javaee7/batch/ChunkedBatchletSpec.scala

package org.littlewings.javaee7.batch

import java.nio.file.{Files, Paths}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit
import java.util.zip.GZIPInputStream
import javax.batch.runtime.{BatchRuntime, BatchStatus}

import org.jberet.runtime.JobExecutionImpl
import org.scalatest.{FunSpec, Matchers}

import scala.io.Source

class ChunkedBatchletSpec extends FunSpec with Matchers {
  describe("Chunked - Batchlet Spec") {
    val jobOperator = BatchRuntime.getJobOperator

    // ここに、テストを書く!
  }
}

最初にこのコードがあるのですが

    val jobOperator = BatchRuntime.getJobOperator

こちらを複数回呼び出すと、Weldがエラーになるので1回の記述にしました。Java SE環境だからでしょうねぇ…。

では、テストの方を。

CSVファイル出力、パスはプロパティで設定するパターン。

    it("using Properties, Success") {
      val jobId = "properties-job"
      val now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"))
      val outputCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv")
      val outputArchiveCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv.gz")

      val properties = new java.util.Properties

      properties.put("csvFilePath", outputCsvFilePath.toString)
      properties.put("archiveCsvFilePath", outputArchiveCsvFilePath.toString)
      properties.put("writeStepStatus", "SUCCESS")

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
      jobExecution.getExitStatus should be("BATCHLET-SUCCESS")

      outputCsvFilePath.toFile should exist
      outputArchiveCsvFilePath.toFile should exist

      val source = Source.fromInputStream(new GZIPInputStream(Files.newInputStream(outputArchiveCsvFilePath)), "UTF-8")
      source.getLines.mkString(",") should be("Java,Scala,Groovy,Clojure,Kotlin,Perl,Ruby,Python,PHP,C")
      source.close()

      Files.delete(outputCsvFilePath)
      Files.delete(outputArchiveCsvFilePath)
    }

その失敗パターン。StepContextのExitStatusに、エラーとなるように文字列設定しています。

    it("using Properties, Fail") {
      val jobId = "properties-job"
      val now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"))
      val outputCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv")
      val outputArchiveCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv.gz")

      val properties = new java.util.Properties

      properties.put("csvFilePath", outputCsvFilePath.toString)
      properties.put("archiveCsvFilePath", outputArchiveCsvFilePath.toString)
      properties.put("writeStepStatus", "PARAMETER-STATUS-FAIL")

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.FAILED)
      jobExecution.getExitStatus should be("DECISION-FAILED-END")

      outputCsvFilePath.toFile should exist
      outputArchiveCsvFilePath.toFile should not be (exist)

      Files.delete(outputCsvFilePath)
    }

gzipファイルは作成されません。

JobContextのTransientUserDataで、データを受け渡すパターン。

    it("using UserData, Success") {
      val jobId = "user-data-job"
      val now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"))
      val outputArchiveCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv.gz")

      val properties = new java.util.Properties

      properties.put("archiveCsvFilePath", outputArchiveCsvFilePath.toString)
      properties.put("writeStepStatus", "SUCCESS")

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
      jobExecution.getExitStatus should be("BATCHLET-SUCCESS")

      outputArchiveCsvFilePath.toFile should exist

      val source = Source.fromInputStream(new GZIPInputStream(Files.newInputStream(outputArchiveCsvFilePath)), "UTF-8")
      source.getLines.mkString(",") should be("Java,Scala,Groovy,Clojure,Kotlin,Perl,Ruby,Python,PHP,C")
      source.close()

      Files.delete(outputArchiveCsvFilePath)
    }

その失敗パターン。

    it("using UserData, Fail") {
      val jobId = "user-data-job"
      val now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"))
      val outputArchiveCsvFilePath = Paths.get(s"./target/languages-using-propeties-${now}.csv.gz")

      val properties = new java.util.Properties

      properties.put("archiveCsvFilePath", outputArchiveCsvFilePath.toString)
      properties.put("writeStepStatus", "PARAMETER-STATUS-FAIL")

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.FAILED)
      jobExecution.getExitStatus should be("DECISION-FAILED-END")

      outputArchiveCsvFilePath.toFile should not be (exist)
    }

とりあえず、なんとかつながったようです。

まとめ

Chunk、Decision、Batchletをつなげて、小さいバッチを作ってみました。

最初、各ContextのUserDataを覚えてみようかなと始めてみたのですが、若干方針転換してDecisionまで持ち出す形になりました…。まあ、今回はじめて触った各要素の勉強にはなったので、よいかなと。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-chunked-batchlet-decision