jBatchでStep Partitioningを使った時の、Chunk(もしくはBatchlet)とPartitionCollector間でデータを外部保存領域なしで、オンメモリで共有するための一案です。
Step Partitioninigと実行順やスレッドなどの関係は、JSR-352、もしくはこちらをご覧ください。
jBatchのStep Partitioningの実行順をトレースしてみる - CLOVER
Step Partitioningを使用した時、分割されたPartition内にあるPartitionCollectorとChunkもしくはBatchletが並列に動作することになりますが、この時ChunkもしくはBatchletと、PartitionCollectorの間のデータの受け渡し方が気になります。
Chunk(item-countごと)およびBatchletのあとにPartitionCollector#collectPartitionDataが起動するのですが、PartitionCollector#collectPartitionDataは引数を取りません。データを連携する場合は、どうやって行いましょう?
まあ、通常はファイルやデータベースなどを介するのがよいかもなのですが…。
今回は、オンメモリでなんとかする方向で調べてみました。
結論としては、StepContextのTransientUserDataを使用すれば、管理単位がスレッド別に分かれていてなんとかなりそうだったので、こちらを利用してみました。
参考)
Partitioned Chunk Processingで並列処理してみる
jBatchのpartitionを使ってみる - kagamihogeの日記
なお、参考エントリに書かれていますが、JobContext#TransientUserDataの場合はジョブでひとつのため、並列処理全体で共有されてしまい、ちょっとうまくいかなくなります。
それでは、試してみましょう。
お題としては、Step Partitioningを使用して並列処理を行い、ItemWriterでオンメモリに保存したデータをPartitionCollectorで読み出し加工する、です。結果自体は、PartitionAnalyzerが受け取ります。
準備
まずは、ビルド定義。
build.sbt
name := "jbatch-partitioned" version := "0.0.1-SNAPSHOT" organization := "org.littlewings" scalaVersion := "2.11.8" updateOptions := updateOptions.value.withCachedResolution(true) scalacOptions := Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xexperimental") parallelExecution in Test := false libraryDependencies ++= Seq( // jBatch "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final", "org.jberet" % "jberet-se" % "1.2.0.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime", "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime", "org.jboss" % "jandex" % "2.0.2.Final" % "runtime", "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime", "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime", // Logging "org.jboss.logging" % "jboss-logging" % "3.3.0.Final", // Test "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
CDIも利用するので、beans.xmlを用意します。
src/main/resources/META-INF/beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" bean-discovery-mode="annotated"> </beans>
Chunkの実装
では、まずはChunkから実装していきます。今回はItemProcessorは不要だったので、ItemReaderとItemWriterのみ実装しました。
ItemReaderは、Partitionで分割される前提で範囲を決めてデータを返します。
src/main/scala/org/littlewings/javaee7/batch/MyItemReader.scala
package org.littlewings.javaee7.batch import javax.batch.api.BatchProperty import javax.batch.api.chunk.AbstractItemReader import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import org.jboss.logging.Logger @Dependent @Named class MyItemReader extends AbstractItemReader { val logger: Logger = Logger.getLogger(getClass) var languages: Iterator[String] = _ @Inject @BatchProperty var start: Int = _ @Inject @BatchProperty var step: Int = _ override def open(checkpoint: java.io.Serializable): Unit = { logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#open") languages = List("Java", "Scala", "Groovy", "Clojure", "Kotlin", "Perl", "Ruby", "Python", "PHP", "C") .drop(start) .take(step) .iterator } override def readItem(): AnyRef = if (languages.hasNext) languages.next else null }
ItemWriterは、item-count単位でデータを溜め込むことを考えて、Bufferで持つように実装。
src/main/scala/org/littlewings/javaee7/batch/MyItemWriter.scala
package org.littlewings.javaee7.batch import javax.batch.api.chunk.AbstractItemWriter import javax.batch.runtime.context.StepContext import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @Dependent @Named class MyItemWriter extends AbstractItemWriter { @Inject var stepContext: StepContext = _ override def writeItems(items: java.util.List[AnyRef]): Unit = stepContext.getTransientUserData match { case null => val buffer = new ArrayBuffer[String] items.asScala.foreach(item => buffer += item.asInstanceOf[String]) stepContext.setTransientUserData(buffer) case buffer => items.asScala.foreach(item => buffer.asInstanceOf[ArrayBuffer[String]] += item.asInstanceOf[String]) stepContext.setTransientUserData(buffer) } }
データは、StepContext#setTransientUserDataで保存します。
PartitionCollectorおよびPartitionAnalyzer
続いて、ItemWriterの書き込んだ結果を受け取るPartitionCollector。
src/main/scala/org/littlewings/javaee7/batch/MyPartitionCollector.scala
package org.littlewings.javaee7.batch import java.io.Serializable import javax.batch.api.BatchProperty import javax.batch.api.partition.PartitionCollector import javax.batch.runtime.context.StepContext import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import scala.collection.mutable.ArrayBuffer @Dependent @Named class MyPartitionCollector extends PartitionCollector { @Inject @BatchProperty var prefix: String = _ @Inject @BatchProperty var suffix: String = _ @Inject var stepContext: StepContext = _ override def collectPartitionData(): Serializable = { val buffer = stepContext.getTransientUserData.asInstanceOf[ArrayBuffer[String]] stepContext.setTransientUserData(null) if (buffer != null) buffer.map(prefix + _ + suffix).mkString(s"processed thread[${Thread.currentThread.getName}] ", ", ", "") else s"processed thread[${Thread.currentThread.getName}] empty" } }
StepContext#getTransientUserDataでItemWriterが書き出したBufferを取得したら、setTransientUserDataでnullに設定(空のBufferでもいいんですが)。
受け取ったBufferは、prefixおよびsuffixを付与してBufferの内容をカンマ区切りで文字列化します。
つまり、ItemReaderおよびItemWriterからitem-count単位で渡されたデータを、その単位で文字列化します、と。また、この時、動作しているスレッドが分かるようにスレッド名を文字列に含めるようにしています。
ここでTransientUserDataの設定内容を空にしておかないと、全部のitemを都度累積してしまって、文字列化したデータが重複するようになってしまいます。
ここまでは、Chunk(ItemReaderおよびItemWriter)と同じスレッドで動作します。
そして、PartitionAnalyzer。こちらは、メインのスレッドで動作し、PartitionCollector#collectPartitionDataが返却した内容を受け取ります。
今回は、単純にログ出力するように実装してみました。こちらでは、ChunkやPartitionCollectorと違うスレッドで動作していることを確認するために、スレッド名をログ出力しています。
src/main/scala/org/littlewings/javaee7/batch/MyPartitionAnalyzer.scala
package org.littlewings.javaee7.batch import java.io.Serializable import javax.batch.api.partition.PartitionAnalyzer import javax.batch.runtime.BatchStatus import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Dependent @Named class MyPartitionAnalyzer extends PartitionAnalyzer { val logger: Logger = Logger.getLogger(getClass) override def analyzeCollectorData(data: Serializable): Unit = { logger.infof(s"[${Thread.currentThread.getName}] received data = ${data}") } override def analyzeStatus(batchStatus: BatchStatus, exitStatus: String): Unit = () }
Job XML
これらの要素を組み上げる、Job XML。
src/main/resources/META-INF/batch-jobs/job.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="step"> <chunk item-count="2"> <reader ref="myItemReader"> <properties> <property name="start" value="#{partitionPlan['partition.start']}"/> <property name="step" value="#{partitionPlan['partition.step']}"/> </properties> </reader> <writer ref="myItemWriter"/> </chunk> <partition> <plan partitions="4"> <properties partition="0"> <property name="partition.start" value="0"/> <property name="partition.step" value="3"/> </properties> <properties partition="1"> <property name="partition.start" value="3"/> <property name="partition.step" value="3"/> </properties> <properties partition="2"> <property name="partition.start" value="6"/> <property name="partition.step" value="3"/> </properties> <properties partition="3"> <property name="partition.start" value="9"/> <property name="partition.step" value="3"/> </properties> </plan> <collector ref="myPartitionCollector"> <properties> <property name="prefix" value="★"/> <property name="suffix" value="★"/> </properties> </collector> <analyzer ref="myPartitionAnalyzer"/> </partition> </step> </job>
Chunkのitem-countは2とし、各Partitionは3ずつずらして要素を扱うようにしました。
テストコード
最後は、実行するだけのテストコード。
src/test/scala/org/littlewings/javaee7/batch/PartitionedSpec.scala
package org.littlewings.javaee7.batch import java.util.concurrent.TimeUnit import javax.batch.runtime.{BatchRuntime, BatchStatus} import org.jberet.runtime.JobExecutionImpl import org.scalatest.{FunSpec, Matchers} class PartitionedSpec extends FunSpec with Matchers { describe("jBatch Partitioned") { it("execute") { val jobOperator = BatchRuntime.getJobOperator val jobId = "job" val properties = new java.util.Properties val executionId = jobOperator.start(jobId, properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.COMPLETED) } } }
なので、PartitionCollectorが生成したデータに含まれているスレッド名と、PartitionAnalyzerがログ出力するスレッド名が異なるようになるはずですね、と。
こちらは、まあ単純にジョブを動作させるだけです。
動作確認
それでは、動かしてみます。
出力されたログは、こんな感じです。
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: [jberet-4] org.littlewings.javaee7.batch.MyItemReader#open 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: [jberet-5] org.littlewings.javaee7.batch.MyItemReader#open 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: [jberet-2] org.littlewings.javaee7.batch.MyItemReader#open 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: [jberet-3] org.littlewings.javaee7.batch.MyItemReader#open 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-5] ★C★ 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-3] ★Clojure★, ★Kotlin★ 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-3] ★Perl★ 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-3] empty 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-4] ★Ruby★, ★Python★ 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-2] ★Java★, ★Scala★ 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-4] ★PHP★ 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-2] ★Groovy★ 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-4] empty 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-2] empty 4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData INFO: [jberet-1] received data = processed thread[jberet-5] empty
ChunkとPartitionCollectorの間のデータの受け渡しが行われていて、PartitionAnalyzerはPartitionCollectorと異なるスレッドで動作していることが確認できます。
ちょっと実装的に
少し、jBatchのこのあたりの実装についても確認してみましょう。
jBeretにはなりますが、こちらからStepContextがChunkに割り当てられたスレッド単位に作成されていることがわかります。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/CompositeExecutionRunner.java#L156-L157
なので、StepContextはStep Partitioningを使用した場合は、PartitionごとにStepContextが別々に使えるようになる、と。
また、PartitionCollector#collectPartitionDataで返した値が、別のスレッドで動作しているPartitionAnalyzerに渡る仕掛けですが、こちらはQueueが使用されています。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L216
PartitionAnalyzerは、Step全体を見ているStepExecutionRunnerで動作していて、Queueに入れられたデータを取得次第処理を行います。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java#L376
よって、PartitionAnalyzerはすべてのChunkの処理(もしくはBatchletの処理)が終わっていない状態でも、処理が始まります、と。
まとめ
ChunkとPartitionCollector間で、メモリ内でのデータの受け渡しをやってみました。
StepContextがPartition別のスレッドで別々で管理されているのでTransientUserDataを使用しましたが、item-count単位の繰り返しの度に中間結果が累積することが嫌な場合はデータをいったん空にしたりする必要が出たりするので、実際に使うならやっぱりファイルとかデータベースになるのかな、と思いました。
まあ、jBatchの実装側からも動作するスレッドなどの確認もできたので、今回はこれでいいかなと。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-partitioned