CLOVER🍀

That was when it all began.

jBatch(Chunk方式)のトランザクションの開始/コミットタイミングを確認する

jBatchを見ていて、ちょっと気になったのがトランザクション管理。

いつ開始されて、いつコミットされるのでしょう?

日本語情報だと、このあたりに記載がありました。

Chunk方式のStepを使ってみる

Jbatch実践入門 #jdt2015

どうも、Chunk方式の場合にトランザクション管理が行われるようです。

まあ、JSRを見るのがよさそうですね。

The Java Community Process(SM) Program - JSRs: Java Specification Requests - detail JSR# 352

JSR-352の、「11 Job Runtime Lifecycle」を見ると、Batchlet/Chunkの場合それぞれのジョブのライフサイクルが書かれていますが、ここで見てもトランザクション管理が行われているのは、Chunk方式みたいですね。

「11.6 Regular Chunk Processing」を見ると、通常のChunk方式では以下の感じみたいです。

  • begin transaction
  • ItemReader#open
  • ItemWrite#open
  • commit transaction
  • itemがある限り繰り返し
    • begin transaction
    • ItemReader#readItem × N
      • ItemProcessor#processItem
    • ItemWriter#writeItems
    • commit transaction
  • begin transaction
  • ItemWriter#close
  • ItemReader#close
  • commit transaction

※「11.7 Partitioned Chunk Processing」では、PartitionAnalyzerがあるとItemReader/ItemWriterのclose後にトランザクションが発生するようです

この挙動を、Bytemanでトレースして確認してみたいと思います。

準備

まずは、ビルドの定義。
build.sbt

name := "jbatch-chunked-with-tx"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.7"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := true

javaOptions in Test += "-javaagent:/path/to/byteman.jar=script:trace-tx.btm"

libraryDependencies ++= Seq(
  // jBatch
  "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final",
  "org.jberet" % "jberet-se" % "1.2.0.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime",
  "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime",
  "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime",

  // Logging
  "org.jboss.logging" % "jboss-logging" % "3.3.0.Final",

  // Test
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

sbtで実行するのですが、forkしてBytemanの設定を入れています。

fork in Test := true

javaOptions in Test += "-javaagent:/path/to/byteman.jar=script:trace-tx.btm"

jBatcの実装はJBeret、SE環境で実行します。

ItemReader/ItemProcessor/ItemWriter

今回のサンプル用の、Chunk方式の各種実装。

ItemReader。言語のListをIteratorで回しているだけです。あとは、ログ出力ですね。
src/main/scala/org/littlewings/javaee7/batch/MyItemReader.scala

package org.littlewings.javaee7.batch

import java.io.Serializable
import javax.batch.api.chunk.ItemReader
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named("MyItemReader")
class MyItemReader extends ItemReader {
  private val logger: Logger = Logger.getLogger(getClass)

  private val languages: Iterator[String] =
    List("Java", "Scala", "Groovy", "Clojure", "Kotlin", "Perl", "Ruby", "Python", "PHP", "C").iterator

  override def open(checkpoint: Serializable): Unit =
    logger.infof("open.")

  override def readItem(): AnyRef = {
    logger.infof("readItem.")

    if (languages.hasNext) languages.next
    else null
  }

  override def checkpointInfo(): Serializable = {
    logger.infof("checkpoint.")
    null
  }

  override def close(): Unit =
    logger.infof("close.")
}

ItemProcessor。こちらは特に加工等しないので、そのままprocessItemの引数を返却しています。
src/main/scala/org/littlewings/javaee7/batch/MyItemProcessor.scala

package org.littlewings.javaee7.batch

import javax.batch.api.chunk.ItemProcessor
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named("MyItemProcessor")
class MyItemProcessor extends ItemProcessor {
  private val logger: Logger = Logger.getLogger(getClass)

  override def processItem(item: AnyRef): AnyRef = {
    logger.infof("processItem.")
    item
  }
}

ItemWriter。こちらも、ログ出力のみです。
src/main/scala/org/littlewings/javaee7/batch/MyItemWriter.scala

package org.littlewings.javaee7.batch

import java.io.Serializable
import java.util
import javax.batch.api.chunk.ItemWriter
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named("MyItemWriter")
class MyItemWriter extends ItemWriter {
  private val logger: Logger = Logger.getLogger(getClass)

  override def open(checkpoint: Serializable): Unit =
    logger.infof("open.")

  override def writeItems(items: util.List[AnyRef]): Unit =
    logger.infof(s"writeItems[${items.size()}].")

  override def checkpointInfo(): Serializable = {
    logger.infof("checkpoint.")
    null
  }

  override def close(): Unit =
    logger.infof("close.")
}

beans.xml/Job XML

CDIのための、beans.xml
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                           http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="annotated">
</beans>

Job XML。item-countは、4としました。
src/main/resources/META-INF/batch-jobs/my-job.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="my-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step">
        <chunk item-count="4">
            <reader ref="MyItemReader"/>
            <processor ref="MyItemProcessor"/>
            <writer ref="MyItemWriter"/>
        </chunk>
    </step>
</job>

BytemanのRule

それでは、トランザクションをトレースするためのBytemanのRuleを書きます。

JBeretの実装を確認したところ、TransactionManagerでトランザクション管理しているようなので、こちらのbegin/commit時点でログ出力してみましょう。
trace-tx.btm

RULE trace TransactionManager.begin
INTERFACE javax.transaction.TransactionManager
METHOD begin
AT ENTRY
IF TRUE
  DO org.jboss.logging.Logger.getLogger($0.getClass()).info("***** TransactionManager#begin *****")
ENDRULE

RULE trace TransactionManager.commit
INTERFACE javax.transaction.TransactionManager
METHOD commit
AT ENTRY
IF TRUE
  DO org.jboss.logging.Logger.getLogger($0.getClass()).info("***** TransactionManager#commit *****")
ENDRULE

ソースコード上では、このあたり。

open時。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L188-L195

Chunk処理中。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L296
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L335
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L381
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L539

readItem、processItem時に、ロールバックが必要になった場合。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L381
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L428
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L603

close時。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L203-L213

※PartitionAnalyzerがある場合
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java#L346
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java#L388
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java#L405

テストコードを動かして確認

それでは、こんなテストコードを書いて動かしてみます。
src/test/scala/org/littlewings/javaee7/batch/TransactionTracingSpec.scala

package org.littlewings.javaee7.batch

import org.jberet.se.Main
import org.scalatest.{Matchers, FunSpec}

class TransactionTracingSpec extends FunSpec with Matchers {
  describe("Transaction Tracing Spec") {
    it("run batch") {
      Main.main(Array("my-job"))
    }
  }
}

実行。

$ sbt test

open時。

INFO: ***** TransactionManager#begin *****
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: open.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter open
INFO: open.
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#commit *****

item処理中。

3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#begin *****
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: writeItems[4].
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo
INFO: checkpoint.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo
INFO: checkpoint.
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#commit *****
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#begin *****
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: writeItems[4].
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo
INFO: checkpoint.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo
INFO: checkpoint.
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#commit *****
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#begin *****
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: processItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: readItem.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: writeItems[2].
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo
INFO: checkpoint.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo
INFO: checkpoint.
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#commit *****

close時。

3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#commit *****
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#begin *****
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: close.
3 03, 2016 10:29:22 午後 org.littlewings.javaee7.batch.MyItemReader close
INFO: close.
3 03, 2016 10:29:22 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: ***** TransactionManager#commit *****

確かに、ライフサイクルのとおりのログが出力されましたね。

まとめ

ちょっと気になっていた、jBatchでのトランザクション管理の基本パターンを確認してみました。Partitioned Chunkについては確認していませんが、気が向けば…。

あと、Java SE環境でもJBeretがTransactionManagerを動かしているように見えますが、実際にはこれはほぼ何もしていないダミー?実装みたいです。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/tx/LocalTransactionManager.java

ステータスが変わっているだけっぽいので…。

とりあえず、挙動はなんとなくわかったのでOKとしましょう。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-chunked-with-tx