CLOVER🍀

That was when it all began.

jBatchのStep Partitioningで、StepContextのTransientUserDataを使って中間結果を共有してみる

jBatchでStep Partitioningを使った時の、Chunk(もしくはBatchlet)とPartitionCollector間でデータを外部保存領域なしで、オンメモリで共有するための一案です。

Step Partitioninigと実行順やスレッドなどの関係は、JSR-352、もしくはこちらをご覧ください。

jBatchのStep Partitioningの実行順をトレースしてみる - CLOVER

Step Partitioningを使用した時、分割されたPartition内にあるPartitionCollectorとChunkもしくはBatchletが並列に動作することになりますが、この時ChunkもしくはBatchletと、PartitionCollectorの間のデータの受け渡し方が気になります。

Chunk(item-countごと)およびBatchletのあとにPartitionCollector#collectPartitionDataが起動するのですが、PartitionCollector#collectPartitionDataは引数を取りません。データを連携する場合は、どうやって行いましょう?

まあ、通常はファイルやデータベースなどを介するのがよいかもなのですが…。

今回は、オンメモリでなんとかする方向で調べてみました。

結論としては、StepContextのTransientUserDataを使用すれば、管理単位がスレッド別に分かれていてなんとかなりそうだったので、こちらを利用してみました。

参考)
Partitioned Chunk Processingで並列処理してみる

jBatchのpartitionを使ってみる - kagamihogeの日記

なお、参考エントリに書かれていますが、JobContext#TransientUserDataの場合はジョブでひとつのため、並列処理全体で共有されてしまい、ちょっとうまくいかなくなります。

それでは、試してみましょう。

お題としては、Step Partitioningを使用して並列処理を行い、ItemWriterでオンメモリに保存したデータをPartitionCollectorで読み出し加工する、です。結果自体は、PartitionAnalyzerが受け取ります。

準備

まずは、ビルド定義。
build.sbt

name := "jbatch-partitioned"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

updateOptions := updateOptions.value.withCachedResolution(true)

scalacOptions := Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xexperimental")

parallelExecution in Test := false

libraryDependencies ++= Seq(
  // jBatch
  "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final",
  "org.jberet" % "jberet-se" % "1.2.0.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime",
  "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime",
  "org.jboss" % "jandex" % "2.0.2.Final" % "runtime",
  "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime",

  // Logging
  "org.jboss.logging" % "jboss-logging" % "3.3.0.Final",

  // Test
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

CDIも利用するので、beans.xmlを用意します。
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                           http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="annotated">
</beans>

Chunkの実装

では、まずはChunkから実装していきます。今回はItemProcessorは不要だったので、ItemReaderとItemWriterのみ実装しました。

ItemReaderは、Partitionで分割される前提で範囲を決めてデータを返します。
src/main/scala/org/littlewings/javaee7/batch/MyItemReader.scala

package org.littlewings.javaee7.batch

import javax.batch.api.BatchProperty
import javax.batch.api.chunk.AbstractItemReader
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import org.jboss.logging.Logger

@Dependent
@Named
class MyItemReader extends AbstractItemReader {
  val logger: Logger = Logger.getLogger(getClass)

  var languages: Iterator[String] = _

  @Inject
  @BatchProperty
  var start: Int = _

  @Inject
  @BatchProperty
  var step: Int = _

  override def open(checkpoint: java.io.Serializable): Unit = {
    logger.infof(s"[${Thread.currentThread.getName}] ${getClass.getName}#open")

    languages =
      List("Java", "Scala", "Groovy", "Clojure", "Kotlin", "Perl", "Ruby", "Python", "PHP", "C")
        .drop(start)
        .take(step)
        .iterator
  }

  override def readItem(): AnyRef =
    if (languages.hasNext) languages.next
    else null
}

ItemWriterは、item-count単位でデータを溜め込むことを考えて、Bufferで持つように実装。
src/main/scala/org/littlewings/javaee7/batch/MyItemWriter.scala

package org.littlewings.javaee7.batch

import javax.batch.api.chunk.AbstractItemWriter
import javax.batch.runtime.context.StepContext
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

@Dependent
@Named
class MyItemWriter extends AbstractItemWriter {
  @Inject
  var stepContext: StepContext = _

  override def writeItems(items: java.util.List[AnyRef]): Unit =
    stepContext.getTransientUserData match {
      case null =>
        val buffer = new ArrayBuffer[String]
        items.asScala.foreach(item => buffer += item.asInstanceOf[String])
        stepContext.setTransientUserData(buffer)
      case buffer =>
        items.asScala.foreach(item => buffer.asInstanceOf[ArrayBuffer[String]] += item.asInstanceOf[String])
        stepContext.setTransientUserData(buffer)
    }
}

データは、StepContext#setTransientUserDataで保存します。

PartitionCollectorおよびPartitionAnalyzer

続いて、ItemWriterの書き込んだ結果を受け取るPartitionCollector。
src/main/scala/org/littlewings/javaee7/batch/MyPartitionCollector.scala

package org.littlewings.javaee7.batch

import java.io.Serializable
import javax.batch.api.BatchProperty
import javax.batch.api.partition.PartitionCollector
import javax.batch.runtime.context.StepContext
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import scala.collection.mutable.ArrayBuffer

@Dependent
@Named
class MyPartitionCollector extends PartitionCollector {
  @Inject
  @BatchProperty
  var prefix: String = _

  @Inject
  @BatchProperty
  var suffix: String = _

  @Inject
  var stepContext: StepContext = _

  override def collectPartitionData(): Serializable = {
    val buffer = stepContext.getTransientUserData.asInstanceOf[ArrayBuffer[String]]

    stepContext.setTransientUserData(null)

    if (buffer != null)
      buffer.map(prefix + _ + suffix).mkString(s"processed thread[${Thread.currentThread.getName}] ", ", ", "")
    else
      s"processed thread[${Thread.currentThread.getName}] empty"
  }
}

StepContext#getTransientUserDataでItemWriterが書き出したBufferを取得したら、setTransientUserDataでnullに設定(空のBufferでもいいんですが)。

受け取ったBufferは、prefixおよびsuffixを付与してBufferの内容をカンマ区切りで文字列化します。

つまり、ItemReaderおよびItemWriterからitem-count単位で渡されたデータを、その単位で文字列化します、と。また、この時、動作しているスレッドが分かるようにスレッド名を文字列に含めるようにしています。

ここでTransientUserDataの設定内容を空にしておかないと、全部のitemを都度累積してしまって、文字列化したデータが重複するようになってしまいます。

ここまでは、Chunk(ItemReaderおよびItemWriter)と同じスレッドで動作します。


そして、PartitionAnalyzer。こちらは、メインのスレッドで動作し、PartitionCollector#collectPartitionDataが返却した内容を受け取ります。

今回は、単純にログ出力するように実装してみました。こちらでは、ChunkやPartitionCollectorと違うスレッドで動作していることを確認するために、スレッド名をログ出力しています。
src/main/scala/org/littlewings/javaee7/batch/MyPartitionAnalyzer.scala

package org.littlewings.javaee7.batch

import java.io.Serializable
import javax.batch.api.partition.PartitionAnalyzer
import javax.batch.runtime.BatchStatus
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Dependent
@Named
class MyPartitionAnalyzer extends PartitionAnalyzer {
  val logger: Logger = Logger.getLogger(getClass)

  override def analyzeCollectorData(data: Serializable): Unit = {
    logger.infof(s"[${Thread.currentThread.getName}] received data = ${data}")
  }

  override def analyzeStatus(batchStatus: BatchStatus, exitStatus: String): Unit = ()
}

Job XML

これらの要素を組み上げる、Job XML
src/main/resources/META-INF/batch-jobs/job.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step">
        <chunk item-count="2">
            <reader ref="myItemReader">
                <properties>
                    <property name="start" value="#{partitionPlan['partition.start']}"/>
                    <property name="step" value="#{partitionPlan['partition.step']}"/>
                </properties>
            </reader>
            <writer ref="myItemWriter"/>
        </chunk>
        <partition>
            <plan partitions="4">
                <properties partition="0">
                    <property name="partition.start" value="0"/>
                    <property name="partition.step" value="3"/>
                </properties>
                <properties partition="1">
                    <property name="partition.start" value="3"/>
                    <property name="partition.step" value="3"/>
                </properties>
                <properties partition="2">
                    <property name="partition.start" value="6"/>
                    <property name="partition.step" value="3"/>
                </properties>
                <properties partition="3">
                    <property name="partition.start" value="9"/>
                    <property name="partition.step" value="3"/>
                </properties>
            </plan>
            <collector ref="myPartitionCollector">
                <properties>
                    <property name="prefix" value="★"/>
                    <property name="suffix" value="★"/>
                </properties>
            </collector>
            <analyzer ref="myPartitionAnalyzer"/>
        </partition>
    </step>
</job>

Chunkのitem-countは2とし、各Partitionは3ずつずらして要素を扱うようにしました。

テストコード

最後は、実行するだけのテストコード。
src/test/scala/org/littlewings/javaee7/batch/PartitionedSpec.scala

package org.littlewings.javaee7.batch

import java.util.concurrent.TimeUnit
import javax.batch.runtime.{BatchRuntime, BatchStatus}

import org.jberet.runtime.JobExecutionImpl
import org.scalatest.{FunSpec, Matchers}

class PartitionedSpec extends FunSpec with Matchers {
  describe("jBatch Partitioned") {
    it("execute") {
      val jobOperator = BatchRuntime.getJobOperator

      val jobId = "job"
      val properties = new java.util.Properties

      val executionId = jobOperator.start(jobId, properties)

      val jobExecution = jobOperator.getJobExecution(executionId)
      jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

      jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)
    }
  }
}

なので、PartitionCollectorが生成したデータに含まれているスレッド名と、PartitionAnalyzerがログ出力するスレッド名が異なるようになるはずですね、と。

こちらは、まあ単純にジョブを動作させるだけです。

動作確認

それでは、動かしてみます。

出力されたログは、こんな感じです。

4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: [jberet-4] org.littlewings.javaee7.batch.MyItemReader#open
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: [jberet-5] org.littlewings.javaee7.batch.MyItemReader#open
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: [jberet-2] org.littlewings.javaee7.batch.MyItemReader#open
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: [jberet-3] org.littlewings.javaee7.batch.MyItemReader#open
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-5] ★C★
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-3] ★Clojure★, ★Kotlin★
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-3] ★Perl★
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-3] empty
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-4] ★Ruby★, ★Python★
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-2] ★Java★, ★Scala★
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-4] ★PHP★
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-2] ★Groovy★
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-4] empty
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-2] empty
4 07, 2016 10:56:52 午後 org.littlewings.javaee7.batch.MyPartitionAnalyzer analyzeCollectorData
INFO: [jberet-1] received data = processed thread[jberet-5] empty

ChunkとPartitionCollectorの間のデータの受け渡しが行われていて、PartitionAnalyzerはPartitionCollectorと異なるスレッドで動作していることが確認できます。

ちょっと実装的に

少し、jBatchのこのあたりの実装についても確認してみましょう。

jBeretにはなりますが、こちらからStepContextがChunkに割り当てられたスレッド単位に作成されていることがわかります。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/CompositeExecutionRunner.java#L156-L157

なので、StepContextはStep Partitioningを使用した場合は、PartitionごとにStepContextが別々に使えるようになる、と。

また、PartitionCollector#collectPartitionDataで返した値が、別のスレッドで動作しているPartitionAnalyzerに渡る仕掛けですが、こちらはQueueが使用されています。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java#L216

PartitionAnalyzerは、Step全体を見ているStepExecutionRunnerで動作していて、Queueに入れられたデータを取得次第処理を行います。
https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-core/src/main/java/org/jberet/runtime/runner/StepExecutionRunner.java#L376

よって、PartitionAnalyzerはすべてのChunkの処理(もしくはBatchletの処理)が終わっていない状態でも、処理が始まります、と。

まとめ

ChunkとPartitionCollector間で、メモリ内でのデータの受け渡しをやってみました。

StepContextがPartition別のスレッドで別々で管理されているのでTransientUserDataを使用しましたが、item-count単位の繰り返しの度に中間結果が累積することが嫌な場合はデータをいったん空にしたりする必要が出たりするので、実際に使うならやっぱりファイルとかデータベースになるのかな、と思いました。

まあ、jBatchの実装側からも動作するスレッドなどの確認もできたので、今回はこれでいいかなと。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-partitioned