jBatchを見ていて、ちょっと気になったのがトランザクション管理。
いつ開始されて、いつコミットされるのでしょう?
日本語情報だと、このあたりに記載がありました。
どうも、Chunk方式の場合にトランザクション管理が行われるようです。
まあ、JSRを見るのがよさそうですね。
The Java Community Process(SM) Program - JSRs: Java Specification Requests - detail JSR# 352
JSR-352の、「11 Job Runtime Lifecycle」を見ると、Batchlet/Chunkの場合それぞれのジョブのライフサイクルが書かれていますが、ここで見てもトランザクション管理が行われているのは、Chunk方式みたいですね。
「11.6 Regular Chunk Processing」を見ると、通常のChunk方式では以下の感じみたいです。
- begin transaction
- ItemReader#open
- ItemWrite#open
- commit transaction
- itemがある限り繰り返し
- begin transaction
- ItemReader#readItem × N
- ItemProcessor#processItem
- ItemWriter#writeItems
- commit transaction
- begin transaction
- ItemWriter#close
- ItemReader#close
- commit transaction
※「11.7 Partitioned Chunk Processing」では、PartitionAnalyzerがあるとItemReader/ItemWriterのclose後にトランザクションが発生するようです
この挙動を、Bytemanでトレースして確認してみたいと思います。
準備
まずは、ビルドの定義。
build.sbt
name := "jbatch-chunked-with-tx" version := "0.0.1-SNAPSHOT" organization := "org.littlewings" scalaVersion := "2.11.7" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) fork in Test := true javaOptions in Test += "-javaagent:/path/to/byteman.jar=script:trace-tx.btm" libraryDependencies ++= Seq( // jBatch "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final", "org.jberet" % "jberet-se" % "1.2.0.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime", "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime", "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime", "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime", // Logging "org.jboss.logging" % "jboss-logging" % "3.3.0.Final", // Test "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
sbtで実行するのですが、forkしてBytemanの設定を入れています。
fork in Test := true javaOptions in Test += "-javaagent:/path/to/byteman.jar=script:trace-tx.btm"
jBatcの実装はJBeret、SE環境で実行します。
ItemReader/ItemProcessor/ItemWriter
今回のサンプル用の、Chunk方式の各種実装。
ItemReader。言語のListをIteratorで回しているだけです。あとは、ログ出力ですね。
src/main/scala/org/littlewings/javaee7/batch/MyItemReader.scala
package org.littlewings.javaee7.batch import java.io.Serializable import javax.batch.api.chunk.ItemReader import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named("MyItemReader") class MyItemReader extends ItemReader { private val logger: Logger = Logger.getLogger(getClass) private val languages: Iterator[String] = List("Java", "Scala", "Groovy", "Clojure", "Kotlin", "Perl", "Ruby", "Python", "PHP", "C").iterator override def open(checkpoint: Serializable): Unit = logger.infof("open.") override def readItem(): AnyRef = { logger.infof("readItem.") if (languages.hasNext) languages.next else null } override def checkpointInfo(): Serializable = { logger.infof("checkpoint.") null } override def close(): Unit = logger.infof("close.") }
ItemProcessor。こちらは特に加工等しないので、そのままprocessItemの引数を返却しています。
src/main/scala/org/littlewings/javaee7/batch/MyItemProcessor.scala
package org.littlewings.javaee7.batch import javax.batch.api.chunk.ItemProcessor import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named("MyItemProcessor") class MyItemProcessor extends ItemProcessor { private val logger: Logger = Logger.getLogger(getClass) override def processItem(item: AnyRef): AnyRef = { logger.infof("processItem.") item } }
ItemWriter。こちらも、ログ出力のみです。
src/main/scala/org/littlewings/javaee7/batch/MyItemWriter.scala
package org.littlewings.javaee7.batch import java.io.Serializable import java.util import javax.batch.api.chunk.ItemWriter import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named("MyItemWriter") class MyItemWriter extends ItemWriter { private val logger: Logger = Logger.getLogger(getClass) override def open(checkpoint: Serializable): Unit = logger.infof("open.") override def writeItems(items: util.List[AnyRef]): Unit = logger.infof(s"writeItems[${items.size()}].") override def checkpointInfo(): Serializable = { logger.infof("checkpoint.") null } override def close(): Unit = logger.infof("close.") }
beans.xml/Job XML
CDIのための、beans.xml。
src/main/resources/META-INF/beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" bean-discovery-mode="annotated"> </beans>
Job XML。item-countは、4としました。
src/main/resources/META-INF/batch-jobs/my-job.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="my-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="step"> <chunk item-count="4"> <reader ref="MyItemReader"/> <processor ref="MyItemProcessor"/> <writer ref="MyItemWriter"/> </chunk> </step> </job>
BytemanのRule
それでは、トランザクションをトレースするためのBytemanのRuleを書きます。
JBeretの実装を確認したところ、TransactionManagerでトランザクション管理しているようなので、こちらのbegin/commit時点でログ出力してみましょう。
trace-tx.btm
RULE trace TransactionManager.begin INTERFACE javax.transaction.TransactionManager METHOD begin AT ENTRY IF TRUE DO org.jboss.logging.Logger.getLogger($0.getClass()).info("***** TransactionManager#begin *****") ENDRULE RULE trace TransactionManager.commit INTERFACE javax.transaction.TransactionManager METHOD commit AT ENTRY IF TRUE DO org.jboss.logging.Logger.getLogger($0.getClass()).info("***** TransactionManager#commit *****") ENDRULE
ソースコード上では、このあたり。
Chunk処理中。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L296
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L335
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L381
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L539
readItem、processItem時に、ロールバックが必要になった場合。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L381
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L428
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L603
※PartitionAnalyzerがある場合
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java#L346
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java#L388
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java#L405
テストコードを動かして確認
それでは、こんなテストコードを書いて動かしてみます。
src/test/scala/org/littlewings/javaee7/batch/TransactionTracingSpec.scala
package org.littlewings.javaee7.batch import org.jberet.se.Main import org.scalatest.{Matchers, FunSpec} class TransactionTracingSpec extends FunSpec with Matchers { describe("Transaction Tracing Spec") { it("run batch") { Main.main(Array("my-job")) } } }
実行。
$ sbt test
open時。
INFO: ***** TransactionManager#begin ***** 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: open. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter open INFO: open. 3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#commit *****
item処理中。
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#begin ***** 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: writeItems[4]. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo INFO: checkpoint. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo INFO: checkpoint. 3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#commit ***** 3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#begin ***** 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: writeItems[4]. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo INFO: checkpoint. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo INFO: checkpoint. 3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#commit ***** 3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#begin ***** 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: processItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: readItem. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: writeItems[2]. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo INFO: checkpoint. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo INFO: checkpoint. 3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#commit *****
close時。
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#commit ***** 3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#begin ***** 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: close. 3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader close INFO: close. 3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: ***** TransactionManager#commit *****
確かに、ライフサイクルのとおりのログが出力されましたね。
まとめ
ちょっと気になっていた、jBatchでのトランザクション管理の基本パターンを確認してみました。Partitioned Chunkについては確認していませんが、気が向けば…。
あと、Java SE環境でもJBeretがTransactionManagerを動かしているように見えますが、実際にはこれはほぼ何もしていないダミー?実装みたいです。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/tx/LocalTransactionManager.java
ステータスが変わっているだけっぽいので…。
とりあえず、挙動はなんとなくわかったのでOKとしましょう。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-chunked-with-tx