jBatchには、Step PartitioninというStep内でマルチスレッドを使った並列処理の仕組みがあるようです。
※Step自体を並列実行する場合は、Split/Flowを使用します
8.2.6 Step Partitioning
The Java Community Process(SM) Program - JSRs: Java Specification Requests - detail JSR# 352
出てくるインターフェースが、PartitionMapper、PartitionReducer、PartitionCollectorとかだったので、Map Reduceっぽいものを予想してたらちょっと違う気がするのでまずは実行順から見ていくことにします。
主要な登場人物
Step Partitionningでは、BatchletもしくはChunkと組み合わせて並列処理を行います。よって、BatchletまたはChunk(ItemReader、ItemProcessor、ItemWriter)のいずれかが必要です。
それ以外には、以下の4つが登場します。いずれの要素も任意だったりするようですが。
- PartitionMapper … 処理をいくつに分けて並列実行させるか、またスレッド数などを設定するPartitionPlanを決定します。このインターフェースは、並列実行されません
- PartitionReducer … 各処理の前後で呼び出される処理を実装します。このインターフェースは、並列実行されません
- PartitionCollector … 各Worker(Batchletおよびchunk単位)の処理の後に呼び出されます。各Workerのスレッドで実行され、このインターフェースで返した値を、PartitionAnalyzerで受け取ることができます
- PartitionAnalyzer … 各Partitionの処理の後に、PartitionCollectorが返した値を受け取ることができます。このインターフェースは、並列実行されません
動作フローについては、JSR-352の「11.5 Partitioned Batchlet Processsing」、「11.7 Partitioned Chunk Processing」を見ればよいのですが、今回は実際に動かして実行順を見ていきたいと思います。
各種インターフェースの実装は用意しますが、あまり意味のある内容は実装しません。あくまで、実行順を見ることが目的です。
準備
まずは、ビルド定義。
build.sbt
name := "jbatch-partitioned-tracing" version := "0.0.1-SNAPSHOT" organization := "org.littlewings" scalaVersion := "2.11.8" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) fork in Test := true parallelExecution in Test := false javaOptions in Test += "-javaagent:/usr/local/byteman/current/lib/byteman.jar=script:trace-tx.btm" libraryDependencies ++= Seq( // jBatch "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final", "org.jberet" % "jberet-se" % "1.2.0.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime", "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime", "org.jboss" % "jandex" % "2.0.2.Final" % "runtime", "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime", "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime", // Logging "org.jboss.logging" % "jboss-logging" % "3.3.0.Final", // Test "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
jBatchの実装は、JBeretを使用します。また、実行環境はJava SEとします。
また、JTAのトランザクション開始、コミットのタイミングも合わせて見るため、Bytemanでトレースするようにします。
trace-tx.btm
RULE trace TransactionManager.begin INTERFACE javax.transaction.TransactionManager METHOD begin AT ENTRY IF TRUE DO org.jboss.logging.Logger.getLogger($0.getClass()).info("***** TransactionManager#begin *****") ENDRULE RULE trace TransactionManager.commit INTERFACE javax.transaction.TransactionManager METHOD commit AT ENTRY IF TRUE DO org.jboss.logging.Logger.getLogger($0.getClass()).info("***** TransactionManager#commit *****") ENDRULE
CDIも使うので、beans.xmlも用意します。
src/main/resources/META-INF/beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" bean-discovery-mode="annotated"> </beans>
Batchet、Chunk用の実装クラスの用意
Step内では、BatchletまたはChunk関連の要素が必要になるため、それぞれ実装クラスを作成しておきます。
Batchlet。
src/main/scala/org/littlewings/javaee7/batch/TraceBatchlet.scala
package org.littlewings.javaee7.batch import java.util.concurrent.TimeUnit import javax.batch.api.AbstractBatchlet import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named class TraceBatchlet extends AbstractBatchlet { val logger: Logger = Logger.getLogger(getClass) override def process(): String = { logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#process") TimeUnit.SECONDS.sleep(3L) "PROCESS" } }
トレース用のログ出力以外にも、並列に動いていることを確認するためにちょっとスリープを入れています。
続いて、Chunk関連のクラス。
src/main/scala/org/littlewings/javaee7/batch/TraceItemReader.scala
package org.littlewings.javaee7.batch import java.io.Serializable import java.util.concurrent.TimeUnit import javax.batch.api.BatchProperty import javax.batch.api.chunk.ItemReader import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import org.jboss.logging.Logger @Dependent @Named class TraceItemReader extends ItemReader { val logger: Logger = Logger.getLogger(getClass) var languages: Iterator[String] = _ @Inject @BatchProperty var start: Int = _ @Inject @BatchProperty var step: Int = _ override def open(checkpoint: Serializable): Unit = { logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#open") languages = List("Java", "Scala", "Groovy", "Clojure", "Kotlin", "Perl", "Ruby", "Python", "PHP", "C") .drop(start) .take(step) .iterator TimeUnit.SECONDS.sleep(3L) } override def readItem(): AnyRef = { logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#readItem") if (languages.hasNext) languages.next else null } override def checkpointInfo(): Serializable = { logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#checkpointInfo") null } override def close(): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#close") }
Chunk方式では、ItemReaderにスリープを入れています。また、並列実行されるItemReaderが全部同じデータを返してしまうと、データがPartition数分倍増してしまうので、区切るようにしています。
区切り方は、後に出てくるPartitionPlanで設定します。
ItemProcessor。
src/main/scala/org/littlewings/javaee7/batch/TraceItemProcessor.scala
package org.littlewings.javaee7.batch import javax.batch.api.chunk.ItemProcessor import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named class TraceItemProcessor extends ItemProcessor { val logger: Logger = Logger.getLogger(getClass) override def processItem(item: AnyRef): AnyRef = { logger.info(s"[${Thread.currentThread.getName}] ${getClass.getName}#processItem, item = ${item}") item } }
ItemWriter。
src/main/scala/org/littlewings/javaee7/batch/TraceItemWriter.scala
package org.littlewings.javaee7.batch import java.io.Serializable import javax.batch.api.chunk.ItemWriter import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named class TraceItemWriter extends ItemWriter { val logger: Logger = Logger.getLogger(getClass) override def writeItems(items: java.util.List[AnyRef]): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#writeItems, items = ${items}") override def checkpointInfo(): Serializable = { logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#checkpointInfo") null } override def close(): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#close") override def open(checkpoint: Serializable): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#open") }
各種Partition系の実装
それでは、各種Partition〜インターフェースの実装を作成していきます。
PartitionMapper。
src/main/scala/org/littlewings/javaee7/batch/TracePartitionMapper.scala
package org.littlewings.javaee7.batch import javax.batch.api.partition.{PartitionMapper, PartitionPlan, PartitionPlanImpl} import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named class TracePartitionMapper extends PartitionMapper { var logger: Logger = Logger.getLogger(getClass) override def mapPartitions(): PartitionPlan = { logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#mapPartitions.") val range = 3 val (partitionSize, threadSize) = (4, 4) val partitionPlan = new PartitionPlanImpl partitionPlan.setPartitions(partitionSize) partitionPlan.setThreads(threadSize) partitionPlan.setPartitionProperties((1 to partitionSize).map { i => val properties = new java.util.Properties properties.setProperty("partition.start", ((i - 1) * 3).toString) properties }.toArray) partitionPlan } }
PartitionMapperでは、PartitionPlanを作成することで、該当のStepをいくつのPartitionに分け、いくつのスレッドで実行するのかを設定します。
今回は明示的にPartition数とスレッド数を同じにしていますが、デフォルトのスレッド数は0で、その状態だとPartition数と同じ数に設定されます。
あとは、Partition単位にPropertiesを配列で設定する必要があります。
PartitionReducer。
src/main/scala/org/littlewings/javaee7/batch/TracePartitionReducer.scala
package org.littlewings.javaee7.batch import javax.batch.api.partition.PartitionReducer import javax.batch.api.partition.PartitionReducer.PartitionStatus import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named class TracePartitionReducer extends PartitionReducer { var logger: Logger = Logger.getLogger(getClass) override def rollbackPartitionedStep(): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#rollbackPartitionedStep") override def beginPartitionedStep(): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#beginPartitionedStep") override def beforePartitionedStepCompletion(): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#beforePartitionedStepCompletion") override def afterPartitionedStepCompletion(status: PartitionStatus): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#afterPartitionedStepCompletion, status = ${status}") }
Reducerという名前の割には、特に引数に何かデータを受け取るメソッドがありません。唯一、PartitionStatusというものを受け取るafterPartitionedStepCompletionというメソッドがありますが、これはCOMMIT/ROLLBACKがわかるだけの列挙型です。
要するに、各処理の開始、終了時などで何かしら処理を入れるためのインターフェースと思えばよさそうです。
PartitionCollector。
src/main/scala/org/littlewings/javaee7/batch/TracePartitionCollector.scala
package org.littlewings.javaee7.batch import java.io.Serializable import javax.batch.api.partition.PartitionCollector import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named class TracePartitionCollector extends PartitionCollector { val logger: Logger = Logger.getLogger(getClass) override def collectPartitionData(): Serializable = { logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#collectPartitionData") s"COLLECTED-${Thread.currentThread.getName}" } }
ここでは、Serializableなデータを返す処理を実装します。
最後、PartitionAnalyzer。
src/main/scala/org/littlewings/javaee7/batch/TracePartitionAnalyzer.scala
package org.littlewings.javaee7.batch import java.io.Serializable import javax.batch.api.partition.PartitionAnalyzer import javax.batch.runtime.BatchStatus import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named class TracePartitionAnalyzer extends PartitionAnalyzer { val logger: Logger = Logger.getLogger(getClass) override def analyzeCollectorData(data: Serializable): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#analyzeCollectorData, data = ${data}") override def analyzeStatus(batchStatus: BatchStatus, exitStatus: String): Unit = logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#analyzeStatus, batchStatus = ${batchStatus}, exitStatus = ${exitStatus}") }
PartitionCollectorが返したデータを受け取ることができ、最後にステータスを受け取るように実装します。
動かしてみる
では、実装した各要素をテストコードで動かしてみます。
テストコードの雛形は、こちら。
src/test/scala/org/littlewings/javaee7/batch/PartitionedBatchSpec.scala
package org.littlewings.javaee7.batch import java.util.concurrent.TimeUnit import javax.batch.runtime.{BatchRuntime, BatchStatus} import org.jberet.runtime.JobExecutionImpl import org.scalatest.{FunSpec, Matchers} class PartitionedBatchSpec extends FunSpec with Matchers { describe("Partitioned Batch Spec") { val jobOperator = BatchRuntime.getJobOperator // ここにテストを書く! } }
JobOperatorは、あらかじめ取得しておきます。
では、まずは最初のケース。Batchletからいきましょう。
it("Trace Partitioned Batchlet") { val jobId = "trace-partition-batchlet" val properties = new java.util.Properties val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.COMPLETED) }
Job XML。
src/main/resources/META-INF/batch-jobs/trace-partition-batchlet.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="trace-partition-batchlet" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="step"> <batchlet ref="traceBatchlet"/> <partition> <mapper ref="tracePartitionMapper"/> <reducer ref="tracePartitionReducer"/> <collector ref="tracePartitionCollector"/> <analyzer ref="tracePartitionAnalyzer"/> </partition> </step> </job>
用意した、各種Partition〜の実装はすべて使います。
実行ログ。
3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TracePartitionReducer beginPartitionedStep INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beginPartitionedStep 3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TracePartitionMapper mapPartitions INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionMapper#mapPartitions. 3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TraceBatchlet process INFO: [jberet-2] org.littlewings.javaee7.batch.TraceBatchlet#process 3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TraceBatchlet process INFO: [jberet-5] org.littlewings.javaee7.batch.TraceBatchlet#process 3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TraceBatchlet process INFO: [jberet-3] org.littlewings.javaee7.batch.TraceBatchlet#process 3 26, 2016 6:57:33 午後 org.littlewings.javaee7.batch.TraceBatchlet process INFO: [jberet-4] org.littlewings.javaee7.batch.TraceBatchlet#process 3 26, 2016 6:57:33 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-1] ***** TransactionManager#begin ***** 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-2] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-3] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-5] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-4] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-2 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = PROCESS 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-3 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = PROCESS 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-5 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = PROCESS 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-4 3 26, 2016 6:57:36 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = PROCESS 3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TracePartitionReducer beforePartitionedStepCompletion INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beforePartitionedStepCompletion 3 26, 2016 6:57:37 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-1] ***** TransactionManager#commit ***** 3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TracePartitionReducer afterPartitionedStepCompletion INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#afterPartitionedStepCompletion, status = COMMIT
今回は、Partitionおよびスレッドを4つにしています。Batchletが4回、それぞれ異なるスレッドで実行されています。
Partition+Batchletでは、以下のフローになります。
- PartitionReducer#beginPartitionedStep
- PartitionMapper#mapPartitions
- Partition数分実行
- Batchlet#process (Workerスレッド)
- Batchlet#stop(Workerスレッド)
- PartitionCollector#collectPartitionData(Workerスレッド)
- begin transaction
- PartitionCollectorが返したデータ数分実行
- PartitionAnalyzer#analyzeCollectorData
- PartitionAnalyzer.analyzeStatus(ロールバックが発生する場合はPartitionReducer#rollbackPartitionedStep)
- PartitionReducer#beforePartitionedStepCompletion
- commit transaction
- PartitionReducer#afterPartitionedStepCompletion
JSR-352の「11.5 Partitioned Batchlet Processsing」を見ていると、特にトランザクションはなさそうでしたが、JBeretで動かすとPartitionAnalyzerがあるとトランザクションが動作しました…。
また、PartitionAnalyzerはBatchletの終了に関係なく動き始めます。
続いて、Chunk方式の場合。
it("Trace Partitioned Chunk") { val jobId = "trace-partition-chunk" val properties = new java.util.Properties val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.COMPLETED) }
Job XMLはこちら。
src/main/resources/META-INF/batch-jobs/trace-partition-chunk.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="trace-partition-chunk" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="step"> <chunk item-count="3"> <reader ref="traceItemReader"> <properties> <property name="start" value="#{partitionPlan['partition.start']}"/> <property name="step" value="3"/> </properties> </reader> <processor ref="traceItemProcessor"/> <writer ref="traceItemWriter"/> </chunk> <partition> <mapper ref="tracePartitionMapper"/> <reducer ref="tracePartitionReducer"/> <collector ref="tracePartitionCollector"/> <analyzer ref="tracePartitionAnalyzer"/> </partition> </step> </job>
ItemReaderのPropertiesで、ItemReaderが返却するデータの範囲などを設定するようにしています。
実行ログ。ちょっと長いですが…。
3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TracePartitionReducer beginPartitionedStep INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beginPartitionedStep 3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TracePartitionMapper mapPartitions INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionMapper#mapPartitions. 3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-1] ***** TransactionManager#begin ***** 3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-2] ***** TransactionManager#begin ***** 3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-4] ***** TransactionManager#begin ***** 3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-5] ***** TransactionManager#begin ***** 3 26, 2016 6:57:40 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-3] ***** TransactionManager#begin ***** 3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TraceItemReader open INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#open 3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TraceItemReader open INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#open 3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TraceItemReader open INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#open 3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TraceItemReader open INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#open 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter open INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#open 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter open INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#open 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter open INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#open 3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-3] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter open INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#open 3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-5] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-4] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-2] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-4] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-3] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-5] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-2] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = C 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [C] 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Clojure 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Java 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Kotlin 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Ruby 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Scala 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Perl 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Clojure, Kotlin, Perl] 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Groovy 3 26, 2016 6:57:43 午後 sun.reflect.NativeMethodAccessorImpl invoke0 INFO: [jberet-4] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Java, Scala, Groovy] 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Python 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-5] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-4] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-5] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-5] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-4 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-4] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-5 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = PHP 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-5] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter close INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#close 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-5] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Ruby, Python, PHP] 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter close INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#close 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader close INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#close 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-4] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader close INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#close 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-5] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-4] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-3] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-4 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-5] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = COMPLETED 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-3 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-5 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = COMPLETED 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-2] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-2 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter close INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#close 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader close INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#close 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemWriter close INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#close 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader close INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#close 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-3] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-3 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = COMPLETED 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionCollector collectPartitionData INFO: [jberet-2] org.littlewings.javaee7.batch.TracePartitionCollector#collectPartitionData 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeCollectorData INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeCollectorData, data = COLLECTED-jberet-2 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionAnalyzer analyzeStatus INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionAnalyzer#analyzeStatus, batchStatus = COMPLETED, exitStatus = COMPLETED 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionReducer beforePartitionedStepCompletion INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beforePartitionedStepCompletion 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-1] ***** TransactionManager#commit ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionReducer afterPartitionedStepCompletion INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#afterPartitionedStepCompletion, status = COMMIT
Partition+Chunkでは、以下のフローになります。
- PartitionReducer#beginPartitionedStep
- PartitionMapper#mapPartitions
- Partition数分実行
- begin transaction (Workerスレッド)
- ItemReader#open(Workerスレッド)
- ItemWrite#open(Workerスレッド)
- commit transaction(Workerスレッド)
- itemがある限り繰り返し
- begin transaction(Workerスレッド)
- ItemReader#readItem × N
- ItemProcessor#processItem(Workerスレッド)
- ItemWriter#writeItems(Workerスレッド)
- ItemReader#checkpointInfo(Workerスレッド)
- ItemWriter#checkpointInfo(Workerスレッド)
- commit transaction(Workerスレッド)
- PartitionCollector#collectPartitionData(Workerスレッド)
- begin transaction(Workerスレッド)
- ItemWriter#close(Workerスレッド)
- ItemReader#close(Workerスレッド)
- commit transaction(Workerスレッド)
- begin transaction
- PartitionCollectorが返したデータ数分実行
- PartitionAnalyzer#analyzeCollectorData
- PartitionAnalyzer.analyzeStatus(ロールバックが発生する場合はPartitionReducer#rollbackPartitionedStep)
- PartitionReducer#beforePartitionedStepCompletion
- commit transaction
- PartitionReducer#afterPartitionedStepCompletion
こちらも、PartitionAnalyzerはBatchletの終了に関係なく動き始めるので、ログがカオスなことに…。
まとめ
各種Partitionを扱った場合の動作について、少し追ってみました。PartitionReducerが思っていたのとけっこう違う場所に出現するので、ちょっと驚きました…。
まだ各インターフェース間のデータの受け渡しなどはしていないので、そのあたりはまた今度。
ソースコード上でトレースする場合は、このあたりを見るとよいでしょう。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/BatchletRunner.java
また、参考エントリはこちら。
Partitioned Chunk Processingで並列処理してみる
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-partitioned-tracing
オマケ
各種Partition〜インターフェースの利用は任意だと書きましたが、オマケとしてPartitionReducerのみを使用したパターンについて書いてみようと思います。
まずは、Batchletのパターン。
it("Trace Partitioned Batchlet, Reducer Only") { val jobId = "trace-partition-batchlet-reducer-only" val properties = new java.util.Properties val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.COMPLETED) }
Job XML。
src/main/resources/META-INF/batch-jobs/trace-partition-batchlet-reducer-only.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="trace-partition-batchlet" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="step"> <batchlet ref="traceBatchlet"/> <partition> <plan partitions="4" threads="4"> <properties partition="0"/> <properties partition="1"/> <properties partition="2"/> <properties partition="3"/> </plan> <reducer ref="tracePartitionReducer"/> </partition> </step> </job>
PartitionMapperを利用しない場合、PartitionPlanはXMLとして定義します。
実行ログ。
3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TracePartitionReducer beginPartitionedStep INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beginPartitionedStep 3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TraceBatchlet process INFO: [jberet-4] org.littlewings.javaee7.batch.TraceBatchlet#process 3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TraceBatchlet process INFO: [jberet-2] org.littlewings.javaee7.batch.TraceBatchlet#process 3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TraceBatchlet process INFO: [jberet-5] org.littlewings.javaee7.batch.TraceBatchlet#process 3 26, 2016 6:57:37 午後 org.littlewings.javaee7.batch.TraceBatchlet process INFO: [jberet-3] org.littlewings.javaee7.batch.TraceBatchlet#process 3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TracePartitionReducer beforePartitionedStepCompletion INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beforePartitionedStepCompletion 3 26, 2016 6:57:40 午後 org.littlewings.javaee7.batch.TracePartitionReducer afterPartitionedStepCompletion INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#afterPartitionedStepCompletion, status = COMMIT
PartitionAnalyzerがいなくなったので、JTAのトランザクションのbegin/commitがいなくなりました。
続いて、Chunk方式でReducerのみにした場合。
it("Trace Partitioned Chunk, Reducer Only") { val jobId = "trace-partition-chunk-reducer-only" val properties = new java.util.Properties val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.COMPLETED) }
Job XML。
src/main/resources/META-INF/batch-jobs/trace-partition-chunk-reducer-only.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="trace-partition-chunk-reducer-only" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="step"> <chunk item-count="3"> <reader ref="traceItemReader"> <properties> <property name="start" value="#{partitionPlan['partition.start']}"/> <property name="step" value="3"/> </properties> </reader> <processor ref="traceItemProcessor"/> <writer ref="traceItemWriter"/> </chunk> <partition> <plan partitions="4" threads="4"> <properties partition="0"> <property name="partition.start" value="0"/> </properties> <properties partition="1"> <property name="partition.start" value="3"/> </properties> <properties partition="2"> <property name="partition.start" value="6"/> </properties> <properties partition="3"> <property name="partition.start" value="9"/> </properties> </plan> <reducer ref="tracePartitionReducer"/> </partition> </step> </job>
こちらの場合、Propertiesをpartition単位に配列っぽく定義していく必要があります…。
実行ログ。
3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TracePartitionReducer beginPartitionedStep INFO: [jberet-1] org.littlewings.javaee7.batch.TracePartitionReducer#beginPartitionedStep 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader open INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#open 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-5] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader open INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#open 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader open INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#open 3 26, 2016 6:57:43 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-4] ***** TransactionManager#begin ***** 3 26, 2016 6:57:43 午後 org.littlewings.javaee7.batch.TraceItemReader open INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#open 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter open INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#open 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#commit ***** 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#begin ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Java 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter open INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#open 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#commit ***** 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#begin ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter open INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemWriter#open 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter open INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#open 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-4] ***** TransactionManager#commit ***** 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-4] ***** TransactionManager#begin ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = C 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [C] 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-4] ***** TransactionManager#commit ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Scala 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Groovy 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Java, Scala, Groovy] 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#commit ***** 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-4] ***** TransactionManager#begin ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Clojure 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Kotlin 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-5] ***** TransactionManager#commit ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Perl 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-2] ***** TransactionManager#begin ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter writeItems INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#writeItems, items = [Clojure, Kotlin, Perl] 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter close INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemWriter#close 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-5] ***** TransactionManager#begin ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader close INFO: [jberet-4] org.littlewings.javaee7.batch.TraceItemReader#close 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-4] ***** TransactionManager#commit ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#commit ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Ruby 3 26, 2016 6:57:46 午後 sun.reflect.GeneratedMethodAccessor10 invoke INFO: [jberet-3] ***** TransactionManager#begin ***** 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#readItem 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-2] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemProcessor processItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemProcessor#processItem, item = Python 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader checkpointInfo INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemReader#checkpointInfo 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemWriter checkpointInfo INFO: [jberet-3] org.littlewings.javaee7.batch.TraceItemWriter#checkpointInfo 3 26, 2016 6:57:46 午後 org.littlewings.javaee7.batch.TraceItemReader readItem INFO: [jberet-5] org.littlewings.javaee7.batch.TraceItemReader#readItem