CLOVER🍀

That was when it all began.

jBatchのSplit/FlowでStepを並列実行する

jBatchで、SplitとFlowという要素を使用すると、ジョブ内の処理を並列実行できるそうです。

Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築

Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築

こちらを試してみましょう。

どうやら、Job XMLにsplitという要素を書いて子要素としてflowを配置すると、flowごとに並列実行される模様です。例えば、以下の様に書くとflow-1とflow-2が並列実行されます、と。

<job id="job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <split id="..." next="...">
        <flow id="flow-1">
            <step id="...">
                ...
            </step>
        </flow>
        <flow id="flow-2">
            <step id="...">
                ...
            </step>
        </flow>
    </split>

各id属性は、ジョブ内で一意である必要があるっぽいですが…。

Splitの子要素内でエラーが発生する(例外がスローされる)と、Splitが異常終了した扱いになるようです。

However, the implementor must be aware that a split may have a child flow where the flow itself or a flow’s child (step, decision, etc.) causes the job execution to terminate.
This could be via an end, stop, or fail transition element, or via an unhandled exception.

In such a case the job should then cease execution before transitioning past the current, containing split, on to the next execution element.

8.4.1 Split Termination Processing - Incomplete

とりあえず、使ってみます。jBatchの実装は、JBeretとします。

準備

ビルド定義は、以下の様に。
build.sbt

name := "jbatch-split-flow"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  // jBatch
  "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final",
  "org.jberet" % "jberet-se" % "1.2.0.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime",
  "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime",
  "org.jboss" % "jandex" % "2.0.2.Final" % "runtime",
  "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime",

  // Logging
  "org.jboss.logging" % "jboss-logging" % "3.3.0.Final",

  // Test
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

CDIも利用するので、beans.xmlも定義。
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                           http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="annotated">
</beans>

Job XMLとBatchlet

それでは、先にJob XMLを記述します。以下のようなジョブを定義しました。

  • 最初にSplit/Flowを設け、Flowは3つとする
  • Flow内のStepは、スリープおよびログ出力するBatchletをひとつ配置して並列実行されていることを確認する
  • Batchletは、パラメーターで異常終了(例外をスロー)させることができるようにする
  • Splitの後続には、通過したことを確認するためのStep/Batchletを配置する

で、こんな感じに。
src/main/resources/META-INF/batch-jobs/job.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <split id="first-split" next="last-step">
        <flow id="flow-1">
            <step id="flow-1-step">
                <batchlet ref="myBatchlet">
                    <properties>
                        <property name="batchName" value="Step1Batch"/>
                        <property name="thrownException" value="#{jobParameters['thrownException-1']}"/>
                    </properties>
                </batchlet>
            </step>
        </flow>
        <flow id="flow-2">
            <step id="flow-2-step">
                <batchlet ref="myBatchlet">
                    <properties>
                        <property name="batchName" value="Step2Batch"/>
                        <property name="thrownException" value="#{jobParameters['thrownException-2']}"/>
                    </properties>
                </batchlet>
            </step>
        </flow>

        <flow id="flow-3">
            <step id="flow-3-step">
                <batchlet ref="myBatchlet">
                    <properties>
                        <property name="batchName" value="Step3Batch"/>
                        <property name="thrownException" value="#{jobParameters['thrownException-3']}"/>
                    </properties>
                </batchlet>
            </step>
        </flow>
    </split>

    <step id="last-step">
        <batchlet ref="lastBatchlet"/>
    </step>
</job>

今回はsplitにnext属性を定義しましたが

    <split id="first-split" next="last-step">

こちらはオプションなので、このSplitでジョブが終了するのであればなくてもOKです。

続いて、Batchletの実装へ。Splitで使われるBatchletから。
src/main/scala/org/littlewings/javaee7/batch/MyBatchlet.scala

package org.littlewings.javaee7.batch

import java.util.concurrent.TimeUnit
import javax.batch.api.{AbstractBatchlet, BatchProperty}
import javax.batch.runtime.context.StepContext
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import org.jboss.logging.Logger

@Dependent
@Named
class MyBatchlet extends AbstractBatchlet {
  val logger: Logger = Logger.getLogger(getClass)

  @Inject
  @BatchProperty
  var batchName: String = _

  @Inject
  @BatchProperty
  var thrownException: Boolean = _

  @Inject
  var stepContext: StepContext = _

  override def process(): String = {
    logger.infof("[%s] batchName = %s, stepName = %s start.", Array(Thread.currentThread.getName, batchName, stepContext.getStepName): _*)

    TimeUnit.SECONDS.sleep(3L)

    if (thrownException) {
      throw new RuntimeException("Oops!!")
    }

    logger.infof("[%s] batchName = %s, stepName = %s end.", Array(Thread.currentThread.getName, batchName, stepContext.getStepName): _*)

    "STEP-FINISH"
  }
}

スレッド名やパラメーターなどをログ出力します。

また、thrownExceptionをtrueにすると、例外を投げます。こちらを使って、Split中でのジョブの失敗を起こさせるようにします。

最後のStepに配置している、Batchlet。
src/main/scala/org/littlewings/javaee7/batch/LastBatchlet.scala

package org.littlewings.javaee7.batch

import javax.batch.api.AbstractBatchlet
import javax.batch.runtime.context.JobContext
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

@Dependent
@Named
class LastBatchlet extends AbstractBatchlet {
  @Inject
  var jobContext: JobContext = _

  override def process(): String = {
    jobContext.setExitStatus("JOB-FINISH")

    "FINISH"
  }
}

このBatchletが呼び出されたことを確認するために、JobContext#setExitStatusに値を設定しています。

確認

それでは、テストコードを書いて確認してみます。

テストコードの雛形は、こちら。
src/test/scala/org/littlewings/javaee7/batch/SplitFlowSpec.scala

package org.littlewings.javaee7.batch

import java.util.concurrent.TimeUnit
import javax.batch.runtime.{BatchStatus, BatchRuntime}

import org.jberet.runtime.JobExecutionImpl
import org.scalatest.{Matchers, FunSpec}

class SplitFlowSpec extends FunSpec with Matchers {
  describe("Split Flow Spec") {
    val jobOperator = BatchRuntime.getJobOperator

    // ここに、テストを書く!
  }
}

JobOperatorの取得は、テスト中1回のみとします。

まずは成功するジョブから。

    it("split-success") {
      val jobId = "job"
      val properties = new java.util.Properties

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
      jobExecution.getExitStatus should be("JOB-FINISH")
    }

結果としてはジョブが完了し、最後のStepにあるBatchletで設定したExit Statusが取得できています。

      jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
      jobExecution.getExitStatus should be("JOB-FINISH")

実行時のログは、こんな感じ。

3 15, 2016 10:29:31 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-3] batchName = Step2Batch, stepName = flow-2-step start.
3 15, 2016 10:29:31 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-4] batchName = Step3Batch, stepName = flow-3-step start.
3 15, 2016 10:29:31 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-2] batchName = Step1Batch, stepName = flow-1-step start.
3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-3] batchName = Step2Batch, stepName = flow-2-step end.
3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-4] batchName = Step3Batch, stepName = flow-3-step end.
3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-2] batchName = Step1Batch, stepName = flow-1-step end.
3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process

スレッド3つで、並列実行されていますね。

続いて、失敗するジョブへ。

    it("split-fail") {
      val jobId = "job"
      val properties = new java.util.Properties
      properties.setProperty("thrownException-3", "true")

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.FAILED)
      jobExecution.getExitStatus should be("FAILED")
    }

Split内の、3つ目のFlowのBatchletにコケてもらいます。

ジョブとしては異常終了となり、最後のStepにあるBatchletは実行されていません。

      jobExecution.getBatchStatus should be(BatchStatus.FAILED)
      jobExecution.getExitStatus should be("FAILED")

実行ログは、こんな感じです。

3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-2] batchName = Step1Batch, stepName = flow-1-step start.
3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-4] batchName = Step2Batch, stepName = flow-2-step start.
3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-3] batchName = Step3Batch, stepName = flow-3-step start.
3 15, 2016 10:29:37 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-2] batchName = Step1Batch, stepName = flow-1-step end.
3 15, 2016 10:29:37 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: [jberet-4] batchName = Step2Batch, stepName = flow-2-step end.
3 15, 2016 10:29:37 午後 org.jberet.runtime.runner.BatchletRunner run
WARN: JBERET000001: Failed to run batchlet org.jberet.job.model.RefArtifact@4790f49e
java.lang.RuntimeException: Oops!!
	at org.littlewings.javaee7.batch.MyBatchlet.process(MyBatchlet.scala:33)
	at org.jberet.runtime.runner.BatchletRunner.run(BatchletRunner.java:72)
	at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:222)
	at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:140)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88)
	at org.jberet.runtime.runner.FlowExecutionRunner.run(FlowExecutionRunner.java:48)
	at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

3つ目のBatchletのみ、失敗です。

まとめ

SplitおよびFlowを使って、ジョブ内の処理を並列実行させてみるとの、Flow内で例外を起こした場合の挙動を確認してみました。

とりあえず、Stepやidによる遷移が分かっていると、そう迷わない気がします。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-split-flow