CLOVER🍀

That was when it all began.

jBatchのStep Partitioningの実行順をトレースしてみる

jBatchには、Step PartitioninというStep内でマルチスレッドを使った並列処理の仕組みがあるようです。
※Step自体を並列実行する場合は、Split/Flowを使用します

8.2.6 Step Partitioning
The Java Community Process(SM) Program - JSRs: Java Specification Requests - detail JSR# 352

出てくるインターフェースが、PartitionMapper、PartitionReducer、PartitionCollectorとかだったので、Map Reduceっぽいものを予想してたらちょっと違う気がするのでまずは実行順から見ていくことにします。

主要な登場人物

Step Partitionningでは、BatchletもしくはChunkと組み合わせて並列処理を行います。よって、BatchletまたはChunk(ItemReader、ItemProcessor、ItemWriter)のいずれかが必要です。

それ以外には、以下の4つが登場します。いずれの要素も任意だったりするようですが。

  • PartitionMapper … 処理をいくつに分けて並列実行させるか、またスレッド数などを設定するPartitionPlanを決定します。このインターフェースは、並列実行されません
  • PartitionReducer … 各処理の前後で呼び出される処理を実装します。このインターフェースは、並列実行されません
  • PartitionCollector … 各Worker(Batchletおよびchunk単位)の処理の後に呼び出されます。各Workerのスレッドで実行され、このインターフェースで返した値を、PartitionAnalyzerで受け取ることができます
  • PartitionAnalyzer … 各Partitionの処理の後に、PartitionCollectorが返した値を受け取ることができます。このインターフェースは、並列実行されません

動作フローについては、JSR-352の「11.5 Partitioned Batchlet Processsing」、「11.7 Partitioned Chunk Processing」を見ればよいのですが、今回は実際に動かして実行順を見ていきたいと思います。

各種インターフェースの実装は用意しますが、あまり意味のある内容は実装しません。あくまで、実行順を見ることが目的です。

準備

まずは、ビルド定義。
build.sbt

name := "jbatch-partitioned-tracing"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := true

parallelExecution in Test := false

javaOptions in Test += "-javaagent:/usr/local/byteman/current/lib/byteman.jar=script:trace-tx.btm"

libraryDependencies ++= Seq(
  // jBatch
  "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final",
  "org.jberet" % "jberet-se" % "1.2.0.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime",
  "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime",
  "org.jboss" % "jandex" % "2.0.2.Final" % "runtime",
  "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime",

  // Logging
  "org.jboss.logging" % "jboss-logging" % "3.3.0.Final",

  // Test
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

jBatchの実装は、JBeretを使用します。また、実行環境はJava SEとします。

また、JTAトランザクション開始、コミットのタイミングも合わせて見るため、Bytemanでトレースするようにします。
trace-tx.btm

RULE trace TransactionManager.begin
INTERFACE javax.transaction.TransactionManager
METHOD begin
AT ENTRY
IF TRUE
  DO org.jboss.logging.Logger.getLogger($0.getClass()).info("***** TransactionManager#begin *****")
ENDRULE

RULE trace TransactionManager.commit
INTERFACE javax.transaction.TransactionManager
METHOD commit
AT ENTRY
IF TRUE
  DO org.jboss.logging.Logger.getLogger($0.getClass()).info("***** TransactionManager#commit *****")
ENDRULE

CDIも使うので、beans.xmlも用意します。
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                           http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="annotated">
</beans>

Batchet、Chunk用の実装クラスの用意

Step内では、BatchletまたはChunk関連の要素が必要になるため、それぞれ実装クラスを作成しておきます。

Batchlet。
src/main/scala/org/littlewings/javaee7/batch/TraceBatchlet.scala

package org.littlewings.javaee7.batch

import java.util.concurrent.TimeUnit
import javax.batch.api.AbstractBatchlet
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named
class TraceBatchlet extends AbstractBatchlet {
  val logger: Logger = Logger.getLogger(getClass)

  override def process(): String = {
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#process")

    TimeUnit.SECONDS.sleep(3L)

    "PROCESS"
  }
}

トレース用のログ出力以外にも、並列に動いていることを確認するためにちょっとスリープを入れています。

続いて、Chunk関連のクラス。
src/main/scala/org/littlewings/javaee7/batch/TraceItemReader.scala

package org.littlewings.javaee7.batch

import java.io.Serializable
import java.util.concurrent.TimeUnit
import javax.batch.api.BatchProperty
import javax.batch.api.chunk.ItemReader
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import org.jboss.logging.Logger

@Dependent
@Named
class TraceItemReader extends ItemReader {
  val logger: Logger = Logger.getLogger(getClass)

  var languages: Iterator[String] = _

  @Inject
  @BatchProperty
  var start: Int = _

  @Inject
  @BatchProperty
  var step: Int = _

  override def open(checkpoint: Serializable): Unit = {
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#open")

    languages =
      List("Java", "Scala", "Groovy", "Clojure", "Kotlin", "Perl", "Ruby", "Python", "PHP", "C")
        .drop(start)
        .take(step)
        .iterator

    TimeUnit.SECONDS.sleep(3L)
  }

  override def readItem(): AnyRef = {
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#readItem")

    if (languages.hasNext) languages.next
    else null
  }

  override def checkpointInfo(): Serializable = {
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#checkpointInfo")
    null
  }

  override def close(): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#close")

}

Chunk方式では、ItemReaderにスリープを入れています。また、並列実行されるItemReaderが全部同じデータを返してしまうと、データがPartition数分倍増してしまうので、区切るようにしています。

区切り方は、後に出てくるPartitionPlanで設定します。

ItemProcessor。
src/main/scala/org/littlewings/javaee7/batch/TraceItemProcessor.scala

package org.littlewings.javaee7.batch

import javax.batch.api.chunk.ItemProcessor
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named
class TraceItemProcessor extends ItemProcessor {
  val logger: Logger = Logger.getLogger(getClass)

  override def processItem(item: AnyRef): AnyRef = {
    logger.info(s"[${Thread.currentThread.getName}] ${getClass.getName}#processItem, item = ${item}")
    item
  }
}

ItemWriter。
src/main/scala/org/littlewings/javaee7/batch/TraceItemWriter.scala

package org.littlewings.javaee7.batch

import java.io.Serializable
import javax.batch.api.chunk.ItemWriter
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named
class TraceItemWriter extends ItemWriter {
  val logger: Logger = Logger.getLogger(getClass)

  override def writeItems(items: java.util.List[AnyRef]): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#writeItems, items = ${items}")

  override def checkpointInfo(): Serializable = {
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#checkpointInfo")
    null
  }

  override def close(): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#close")

  override def open(checkpoint: Serializable): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#open")
}

各種Partition系の実装

それでは、各種Partition〜インターフェースの実装を作成していきます。

PartitionMapper。
src/main/scala/org/littlewings/javaee7/batch/TracePartitionMapper.scala

package org.littlewings.javaee7.batch

import javax.batch.api.partition.{PartitionMapper, PartitionPlan, PartitionPlanImpl}
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named
class TracePartitionMapper extends PartitionMapper {
  var logger: Logger = Logger.getLogger(getClass)

  override def mapPartitions(): PartitionPlan = {
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#mapPartitions.")

    val range = 3
    val (partitionSize, threadSize) = (4, 4)

    val partitionPlan = new PartitionPlanImpl
    partitionPlan.setPartitions(partitionSize)
    partitionPlan.setThreads(threadSize)
    partitionPlan.setPartitionProperties((1 to partitionSize).map { i =>
      val properties = new java.util.Properties
      properties.setProperty("partition.start", ((i - 1) * 3).toString)
      properties
    }.toArray)

    partitionPlan
  }
}

PartitionMapperでは、PartitionPlanを作成することで、該当のStepをいくつのPartitionに分け、いくつのスレッドで実行するのかを設定します。

今回は明示的にPartition数とスレッド数を同じにしていますが、デフォルトのスレッド数は0で、その状態だとPartition数と同じ数に設定されます。

あとは、Partition単位にPropertiesを配列で設定する必要があります。

PartitionReducer。
src/main/scala/org/littlewings/javaee7/batch/TracePartitionReducer.scala

package org.littlewings.javaee7.batch

import javax.batch.api.partition.PartitionReducer
import javax.batch.api.partition.PartitionReducer.PartitionStatus
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named
class TracePartitionReducer extends PartitionReducer {
  var logger: Logger = Logger.getLogger(getClass)

  override def rollbackPartitionedStep(): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#rollbackPartitionedStep")

  override def beginPartitionedStep(): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#beginPartitionedStep")

  override def beforePartitionedStepCompletion(): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#beforePartitionedStepCompletion")

  override def afterPartitionedStepCompletion(status: PartitionStatus): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#afterPartitionedStepCompletion, status = ${status}")
}

Reducerという名前の割には、特に引数に何かデータを受け取るメソッドがありません。唯一、PartitionStatusというものを受け取るafterPartitionedStepCompletionというメソッドがありますが、これはCOMMIT/ROLLBACKがわかるだけの列挙型です。

要するに、各処理の開始、終了時などで何かしら処理を入れるためのインターフェースと思えばよさそうです。

PartitionCollector。
src/main/scala/org/littlewings/javaee7/batch/TracePartitionCollector.scala

package org.littlewings.javaee7.batch

import java.io.Serializable
import javax.batch.api.partition.PartitionCollector
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named
class TracePartitionCollector extends PartitionCollector {
  val logger: Logger = Logger.getLogger(getClass)

  override def collectPartitionData(): Serializable = {
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#collectPartitionData")
    s"COLLECTED-${Thread.currentThread.getName}"
  }
}

ここでは、Serializableなデータを返す処理を実装します。

最後、PartitionAnalyzer。
src/main/scala/org/littlewings/javaee7/batch/TracePartitionAnalyzer.scala

package org.littlewings.javaee7.batch

import java.io.Serializable
import javax.batch.api.partition.PartitionAnalyzer
import javax.batch.runtime.BatchStatus
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named
class TracePartitionAnalyzer extends PartitionAnalyzer {
  val logger: Logger = Logger.getLogger(getClass)

  override def analyzeCollectorData(data: Serializable): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#analyzeCollectorData, data = ${data}")

  override def analyzeStatus(batchStatus: BatchStatus, exitStatus: String): Unit =
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#analyzeStatus, batchStatus = ${batchStatus}, exitStatus = ${exitStatus}")
}

PartitionCollectorが返したデータを受け取ることができ、最後にステータスを受け取るように実装します。

動かしてみる

では、実装した各要素をテストコードで動かしてみます。

テストコードの雛形は、こちら。
src/test/scala/org/littlewings/javaee7/batch/PartitionedBatchSpec.scala

package org.littlewings.javaee7.batch

import java.util.concurrent.TimeUnit
import javax.batch.runtime.{BatchRuntime, BatchStatus}

import org.jberet.runtime.JobExecutionImpl
import org.scalatest.{FunSpec, Matchers}

class PartitionedBatchSpec extends FunSpec with Matchers {
  describe("Partitioned Batch Spec") {
    val jobOperator = BatchRuntime.getJobOperator

    // ここにテストを書く!
  }
}

JobOperatorは、あらかじめ取得しておきます。

では、まずは最初のケース。Batchletからいきましょう。

    it("Trace Partitioned Batchlet") {
      val jobId = "trace-partition-batchlet"
      val properties = new java.util.Properties

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
    }

Job XML
src/main/resources/META-INF/batch-jobs/trace-partition-batchlet.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="trace-partition-batchlet" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step">
        <batchlet ref="traceBatchlet"/>
        <partition>
            <mapper ref="tracePartitionMapper"/>
            <reducer ref="tracePartitionReducer"/>
            <collector ref="tracePartitionCollector"/>
            <analyzer ref="tracePartitionAnalyzer"/>
        </partition>
    </step>
</job>

用意した、各種Partition〜の実装はすべて使います。

実行ログ。

3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TracePartitionReducer beginPartitionedStep
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beginPartitionedStep
3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TracePartitionMapper mapPartitions
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionMapper#mapPartitions.
3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TraceBatchlet process
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceBatchlet#process
3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TraceBatchlet process
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceBatchlet#process
3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TraceBatchlet process
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceBatchlet#process
3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TraceBatchlet process
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceBatchlet#process
3 26, 2016 6:57:33 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-1] ***** TransactionManager#begin *****
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-2] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-3] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-5] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-4] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-2
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = PROCESS
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-3
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = PROCESS
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-5
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = PROCESS
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-4
3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = PROCESS
3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TracePartitionReducer beforePartitionedStepCompletion
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beforePartitionedStepCompletion
3 26, 2016 6:57:37 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-1] ***** TransactionManager#commit *****
3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TracePartitionReducer afterPartitionedStepCompletion
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#afterPartitionedStepCompletion, status = COMMIT

今回は、Partitionおよびスレッドを4つにしています。Batchletが4回、それぞれ異なるスレッドで実行されています。

Partition+Batchletでは、以下のフローになります。

  • PartitionReducer#beginPartitionedStep
  • PartitionMapper#mapPartitions
  • Partition数分実行
    • Batchlet#process (Workerスレッド)
    • Batchlet#stop(Workerスレッド)
    • PartitionCollector#collectPartitionData(Workerスレッド)
  • begin transaction
  • PartitionCollectorが返したデータ数分実行
    • PartitionAnalyzer#analyzeCollectorData
    • PartitionAnalyzer.analyzeStatus(ロールバックが発生する場合はPartitionReducer#rollbackPartitionedStep)
  • PartitionReducer#beforePartitionedStepCompletion
  • commit transaction
  • PartitionReducer#afterPartitionedStepCompletion

JSR-352の「11.5 Partitioned Batchlet Processsing」を見ていると、特にトランザクションはなさそうでしたが、JBeretで動かすとPartitionAnalyzerがあるとトランザクションが動作しました…。

また、PartitionAnalyzerはBatchletの終了に関係なく動き始めます。

続いて、Chunk方式の場合。

    it("Trace Partitioned Chunk") {
      val jobId = "trace-partition-chunk"
      val properties = new java.util.Properties

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
    }

Job XMLはこちら。
src/main/resources/META-INF/batch-jobs/trace-partition-chunk.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="trace-partition-chunk" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step">
        <chunk item-count="3">
            <reader ref="traceItemReader">
                <properties>
                    <property name="start" value="#{partitionPlan['partition.start']}"/>
                    <property name="step" value="3"/>
                </properties>
            </reader>
            <processor ref="traceItemProcessor"/>
            <writer ref="traceItemWriter"/>
        </chunk>
        <partition>
            <mapper ref="tracePartitionMapper"/>
            <reducer ref="tracePartitionReducer"/>
            <collector ref="tracePartitionCollector"/>
            <analyzer ref="tracePartitionAnalyzer"/>
        </partition>
    </step>
</job>

ItemReaderのPropertiesで、ItemReaderが返却するデータの範囲などを設定するようにしています。

実行ログ。ちょっと長いですが…。

3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TracePartitionReducer beginPartitionedStep
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beginPartitionedStep
3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TracePartitionMapper mapPartitions
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionMapper#mapPartitions.
3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-1] ***** TransactionManager#begin *****
3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-2] ***** TransactionManager#begin *****
3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-4] ***** TransactionManager#begin *****
3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-5] ***** TransactionManager#begin *****
3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-3] ***** TransactionManager#begin *****
3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TraceItemReader open
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#open
3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TraceItemReader open
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#open
3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TraceItemReader open
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#open
3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TraceItemReader open
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#open
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter open
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#open
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter open
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#open
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter open
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#open
3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-3] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter open
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#open
3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-5] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-4] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-2] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-4] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-3] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-5] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-2] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = C
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [C]
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Clojure
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Java
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Kotlin
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Ruby
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Scala
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Perl
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Clojure, Kotlin, Perl]
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Groovy
3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0
INFO: [jberet-4] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Java, Scala, Groovy]
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Python
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-5] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-4] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-5] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-5] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-4
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-4] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-5
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = PHP
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-5] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter close
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#close
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-5] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Ruby, Python, PHP]
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter close
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#close
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader close
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#close
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-4] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader close
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#close
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-5] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-4] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-3] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-4
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-5] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = COMPLETED
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-3
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-5
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = COMPLETED
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-2] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-2
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter close
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#close
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader close
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#close
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter close
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#close
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader close
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#close
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-3] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-3
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = COMPLETED
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData
INFO: [jberet-2] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-2
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = COMPLETED
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionReducer beforePartitionedStepCompletion
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beforePartitionedStepCompletion
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-1] ***** TransactionManager#commit *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionReducer afterPartitionedStepCompletion
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#afterPartitionedStepCompletion, status = COMMIT

Partition+Chunkでは、以下のフローになります。

  • PartitionReducer#beginPartitionedStep
  • PartitionMapper#mapPartitions
  • Partition数分実行
    • begin transaction (Workerスレッド)
    • ItemReader#open(Workerスレッド)
    • ItemWrite#open(Workerスレッド)
    • commit transaction(Workerスレッド)
    • itemがある限り繰り返し
      • begin transaction(Workerスレッド)
      • ItemReader#readItem × N
        • ItemProcessor#processItem(Workerスレッド)
      • ItemWriter#writeItems(Workerスレッド)
      • ItemReader#checkpointInfo(Workerスレッド)
      • ItemWriter#checkpointInfo(Workerスレッド)
      • commit transaction(Workerスレッド)
      • PartitionCollector#collectPartitionData(Workerスレッド)
    • begin transaction(Workerスレッド)
    • ItemWriter#close(Workerスレッド)
    • ItemReader#close(Workerスレッド)
    • commit transaction(Workerスレッド)
  • begin transaction
  • PartitionCollectorが返したデータ数分実行
    • PartitionAnalyzer#analyzeCollectorData
    • PartitionAnalyzer.analyzeStatus(ロールバックが発生する場合はPartitionReducer#rollbackPartitionedStep)
  • PartitionReducer#beforePartitionedStepCompletion
  • commit transaction
  • PartitionReducer#afterPartitionedStepCompletion

こちらも、PartitionAnalyzerはBatchletの終了に関係なく動き始めるので、ログがカオスなことに…。

まとめ

各種Partitionを扱った場合の動作について、少し追ってみました。PartitionReducerが思っていたのとけっこう違う場所に出現するので、ちょっと驚きました…。

まだ各インターフェース間のデータの受け渡しなどはしていないので、そのあたりはまた今度。

ソースコード上でトレースする場合は、このあたりを見るとよいでしょう。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/BatchletRunner.java

また、参考エントリはこちら。
Partitioned Chunk Processingで並列処理してみる

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-partitioned-tracing

オマケ

各種Partition〜インターフェースの利用は任意だと書きましたが、オマケとしてPartitionReducerのみを使用したパターンについて書いてみようと思います。

まずは、Batchletのパターン。

    it("Trace Partitioned Batchlet, Reducer Only") {
      val jobId = "trace-partition-batchlet-reducer-only"
      val properties = new java.util.Properties

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
    }

Job XML
src/main/resources/META-INF/batch-jobs/trace-partition-batchlet-reducer-only.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="trace-partition-batchlet" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step">
        <batchlet ref="traceBatchlet"/>
        <partition>
            <plan partitions="4" threads="4">
                <properties partition="0"/>
                <properties partition="1"/>
                <properties partition="2"/>
                <properties partition="3"/>
            </plan>
            <reducer ref="tracePartitionReducer"/>
        </partition>
    </step>
</job>

PartitionMapperを利用しない場合、PartitionPlanはXMLとして定義します。

実行ログ。

3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TracePartitionReducer beginPartitionedStep
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beginPartitionedStep
3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TraceBatchlet process
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceBatchlet#process
3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TraceBatchlet process
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceBatchlet#process
3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TraceBatchlet process
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceBatchlet#process
3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TraceBatchlet process
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceBatchlet#process
3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TracePartitionReducer beforePartitionedStepCompletion
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beforePartitionedStepCompletion
3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TracePartitionReducer afterPartitionedStepCompletion
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#afterPartitionedStepCompletion, status = COMMIT

PartitionAnalyzerがいなくなったので、JTAトランザクションのbegin/commitがいなくなりました。

続いて、Chunk方式でReducerのみにした場合。

    it("Trace Partitioned Chunk, Reducer Only") {
      val jobId = "trace-partition-chunk-reducer-only"
      val properties = new java.util.Properties

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
    }

Job XML
src/main/resources/META-INF/batch-jobs/trace-partition-chunk-reducer-only.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="trace-partition-chunk-reducer-only" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step">
        <chunk item-count="3">
            <reader ref="traceItemReader">
                <properties>
                    <property name="start" value="#{partitionPlan['partition.start']}"/>
                    <property name="step" value="3"/>
                </properties>
            </reader>
            <processor ref="traceItemProcessor"/>
            <writer ref="traceItemWriter"/>
        </chunk>
        <partition>
            <plan partitions="4" threads="4">
                <properties partition="0">
                    <property name="partition.start" value="0"/>
                </properties>
                <properties partition="1">
                    <property name="partition.start" value="3"/>
                </properties>
                <properties partition="2">
                    <property name="partition.start" value="6"/>
                </properties>
                <properties partition="3">
                    <property name="partition.start" value="9"/>
                </properties>
            </plan>
            <reducer ref="tracePartitionReducer"/>
        </partition>
    </step>
</job>

こちらの場合、Propertiesをpartition単位に配列っぽく定義していく必要があります…。

実行ログ。

3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionReducer beginPartitionedStep
INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beginPartitionedStep
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader open
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#open
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-5] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader open
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#open
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader open
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#open
3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-4] ***** TransactionManager#begin *****
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader open
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#open
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter open
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#open
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#commit *****
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#begin *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Java
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter open
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#open
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#commit *****
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#begin *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter open
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#open
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter open
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#open
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-4] ***** TransactionManager#commit *****
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-4] ***** TransactionManager#begin *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = C
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [C]
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-4] ***** TransactionManager#commit *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Scala
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Groovy
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Java, Scala, Groovy]
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#commit *****
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-4] ***** TransactionManager#begin *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Clojure
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Kotlin
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-5] ***** TransactionManager#commit *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Perl
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-2] ***** TransactionManager#begin *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Clojure, Kotlin, Perl]
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter close
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#close
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-5] ***** TransactionManager#begin *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader close
INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#close
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-4] ***** TransactionManager#commit *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#commit *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Ruby
3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke
INFO: [jberet-3] ***** TransactionManager#begin *****
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Python
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo
INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo
3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem
INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem