jBatchで、SplitとFlowという要素を使用すると、ジョブ内の処理を並列実行できるそうです。

Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築
- 作者: 寺田佳央,猪瀬淳,加藤田益嗣,羽生田恒永,梶浦美咲,小田圭二
- 出版社/メーカー: 翔泳社
- 発売日: 2015/12/16
- メディア: 大型本
- この商品を含むブログ (7件) を見る
こちらを試してみましょう。
どうやら、Job XMLにsplitという要素を書いて子要素としてflowを配置すると、flowごとに並列実行される模様です。例えば、以下の様に書くとflow-1とflow-2が並列実行されます、と。
<job id="job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <split id="..." next="..."> <flow id="flow-1"> <step id="..."> ... </step> </flow> <flow id="flow-2"> <step id="..."> ... </step> </flow> </split>
各id属性は、ジョブ内で一意である必要があるっぽいですが…。
Splitの子要素内でエラーが発生する(例外がスローされる)と、Splitが異常終了した扱いになるようです。
However, the implementor must be aware that a split may have a child flow where the flow itself or a flow’s child (step, decision, etc.) causes the job execution to terminate.
This could be via an end, stop, or fail transition element, or via an unhandled exception.In such a case the job should then cease execution before transitioning past the current, containing split, on to the next execution element.
8.4.1 Split Termination Processing - Incomplete
とりあえず、使ってみます。jBatchの実装は、JBeretとします。
準備
ビルド定義は、以下の様に。
build.sbt
name := "jbatch-split-flow" version := "0.0.1-SNAPSHOT" organization := "org.littlewings" scalaVersion := "2.11.8" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) libraryDependencies ++= Seq( // jBatch "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final", "org.jberet" % "jberet-se" % "1.2.0.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime", "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime", "org.jboss" % "jandex" % "2.0.2.Final" % "runtime", "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime", "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime", // Logging "org.jboss.logging" % "jboss-logging" % "3.3.0.Final", // Test "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
CDIも利用するので、beans.xmlも定義。
src/main/resources/META-INF/beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" bean-discovery-mode="annotated"> </beans>
Job XMLとBatchlet
それでは、先にJob XMLを記述します。以下のようなジョブを定義しました。
- 最初にSplit/Flowを設け、Flowは3つとする
- Flow内のStepは、スリープおよびログ出力するBatchletをひとつ配置して並列実行されていることを確認する
- Batchletは、パラメーターで異常終了(例外をスロー)させることができるようにする
- Splitの後続には、通過したことを確認するためのStep/Batchletを配置する
で、こんな感じに。
src/main/resources/META-INF/batch-jobs/job.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <split id="first-split" next="last-step"> <flow id="flow-1"> <step id="flow-1-step"> <batchlet ref="myBatchlet"> <properties> <property name="batchName" value="Step1Batch"/> <property name="thrownException" value="#{jobParameters['thrownException-1']}"/> </properties> </batchlet> </step> </flow> <flow id="flow-2"> <step id="flow-2-step"> <batchlet ref="myBatchlet"> <properties> <property name="batchName" value="Step2Batch"/> <property name="thrownException" value="#{jobParameters['thrownException-2']}"/> </properties> </batchlet> </step> </flow> <flow id="flow-3"> <step id="flow-3-step"> <batchlet ref="myBatchlet"> <properties> <property name="batchName" value="Step3Batch"/> <property name="thrownException" value="#{jobParameters['thrownException-3']}"/> </properties> </batchlet> </step> </flow> </split> <step id="last-step"> <batchlet ref="lastBatchlet"/> </step> </job>
今回はsplitにnext属性を定義しましたが
<split id="first-split" next="last-step">
こちらはオプションなので、このSplitでジョブが終了するのであればなくてもOKです。
続いて、Batchletの実装へ。Splitで使われるBatchletから。
src/main/scala/org/littlewings/javaee7/batch/MyBatchlet.scala
package org.littlewings.javaee7.batch import java.util.concurrent.TimeUnit import javax.batch.api.{AbstractBatchlet, BatchProperty} import javax.batch.runtime.context.StepContext import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import org.jboss.logging.Logger @Dependent @Named class MyBatchlet extends AbstractBatchlet { val logger: Logger = Logger.getLogger(getClass) @Inject @BatchProperty var batchName: String = _ @Inject @BatchProperty var thrownException: Boolean = _ @Inject var stepContext: StepContext = _ override def process(): String = { logger.infof("[%s] batchName = %s, stepName = %s start.", Array(Thread.currentThread.getName, batchName, stepContext.getStepName): _*) TimeUnit.SECONDS.sleep(3L) if (thrownException) { throw new RuntimeException("Oops!!") } logger.infof("[%s] batchName = %s, stepName = %s end.", Array(Thread.currentThread.getName, batchName, stepContext.getStepName): _*) "STEP-FINISH" } }
スレッド名やパラメーターなどをログ出力します。
また、thrownExceptionをtrueにすると、例外を投げます。こちらを使って、Split中でのジョブの失敗を起こさせるようにします。
最後のStepに配置している、Batchlet。
src/main/scala/org/littlewings/javaee7/batch/LastBatchlet.scala
package org.littlewings.javaee7.batch import javax.batch.api.AbstractBatchlet import javax.batch.runtime.context.JobContext import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} @Dependent @Named class LastBatchlet extends AbstractBatchlet { @Inject var jobContext: JobContext = _ override def process(): String = { jobContext.setExitStatus("JOB-FINISH") "FINISH" } }
このBatchletが呼び出されたことを確認するために、JobContext#setExitStatusに値を設定しています。
確認
それでは、テストコードを書いて確認してみます。
テストコードの雛形は、こちら。
src/test/scala/org/littlewings/javaee7/batch/SplitFlowSpec.scala
package org.littlewings.javaee7.batch import java.util.concurrent.TimeUnit import javax.batch.runtime.{BatchStatus, BatchRuntime} import org.jberet.runtime.JobExecutionImpl import org.scalatest.{Matchers, FunSpec} class SplitFlowSpec extends FunSpec with Matchers { describe("Split Flow Spec") { val jobOperator = BatchRuntime.getJobOperator // ここに、テストを書く! } }
JobOperatorの取得は、テスト中1回のみとします。
まずは成功するジョブから。
it("split-success") { val jobId = "job" val properties = new java.util.Properties val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.COMPLETED) jobExecution.getExitStatus should be("JOB-FINISH") }
結果としてはジョブが完了し、最後のStepにあるBatchletで設定したExit Statusが取得できています。
jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
jobExecution.getExitStatus should be("JOB-FINISH")
実行時のログは、こんな感じ。
3 15, 2016 10:29:31 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-3] batchName = Step2Batch, stepName = flow-2-step start. 3 15, 2016 10:29:31 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-4] batchName = Step3Batch, stepName = flow-3-step start. 3 15, 2016 10:29:31 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-2] batchName = Step1Batch, stepName = flow-1-step start. 3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-3] batchName = Step2Batch, stepName = flow-2-step end. 3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-4] batchName = Step3Batch, stepName = flow-3-step end. 3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-2] batchName = Step1Batch, stepName = flow-1-step end. 3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process
スレッド3つで、並列実行されていますね。
続いて、失敗するジョブへ。
it("split-fail") { val jobId = "job" val properties = new java.util.Properties properties.setProperty("thrownException-3", "true") val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.FAILED) jobExecution.getExitStatus should be("FAILED") }
Split内の、3つ目のFlowのBatchletにコケてもらいます。
ジョブとしては異常終了となり、最後のStepにあるBatchletは実行されていません。
jobExecution.getBatchStatus should be(BatchStatus.FAILED)
jobExecution.getExitStatus should be("FAILED")
実行ログは、こんな感じです。
3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-2] batchName = Step1Batch, stepName = flow-1-step start. 3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-4] batchName = Step2Batch, stepName = flow-2-step start. 3 15, 2016 10:29:34 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-3] batchName = Step3Batch, stepName = flow-3-step start. 3 15, 2016 10:29:37 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-2] batchName = Step1Batch, stepName = flow-1-step end. 3 15, 2016 10:29:37 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: [jberet-4] batchName = Step2Batch, stepName = flow-2-step end. 3 15, 2016 10:29:37 午後 org.jberet.runtime.runner.BatchletRunner run WARN: JBERET000001: Failed to run batchlet org.jberet.job.model.RefArtifact@4790f49e java.lang.RuntimeException: Oops!! at org.littlewings.javaee7.batch.MyBatchlet.process(MyBatchlet.scala:33) at org.jberet.runtime.runner.BatchletRunner.run(BatchletRunner.java:72) at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:222) at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:140) at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) at org.jberet.runtime.runner.FlowExecutionRunner.run(FlowExecutionRunner.java:48) at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
3つ目のBatchletのみ、失敗です。
まとめ
SplitおよびFlowを使って、ジョブ内の処理を並列実行させてみるとの、Flow内で例外を起こした場合の挙動を確認してみました。
とりあえず、Stepやidによる遷移が分かっていると、そう迷わない気がします。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-split-flow