CLOVER🍀

That was when it all began.

jBatchのChunk方式を動かしてみる

最近、こちらの本を読んでみまして、久しぶりにjBatchを触ってみたいなぁと思いまして。

Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築

Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築

以前、少しだけ触っていたのですが、とりあえず軽くBatchletを試しただけで、そこで終わっていました。

せっかく本を読んで興味を持ったので、どこまでやるかは微妙ですが、もう少しやってみようかなと。

まずは、Chunk方式でjBatchを書いたことが使ったことがないので、こちらから書いてみようと思います。

Chunk方式でjBatchを使う時は、ItemReader、ItemProcessor、ItemWrierの3つを実装するようです。今回は、この3つを以下のお題で、実装してみようと思います。

テストコードでは、最後に題材にした文書中から登場頻度Top5の名詞を抜き出して確認してみたいと思います。

jBatchの実装にはJBeret、形態素解析にはLucene Kuromoji、データベース保存にはJPAHibernate)、H2 Databaseを使い、実行はJava SE環境で行います。

準備

まずは、ビルド定義から。
build.sbt

name := "jbatch-chunked"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.7"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  // jBatch
  "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final",
  "org.jberet" % "jberet-se" % "1.2.0.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime",
  "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime",
  // "org.jboss" % "jandex" % "2.0.2.Final" % "runtime",
  "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime",

  // JPA
  "org.hibernate" % "hibernate-entitymanager" % "5.1.0.Final",
  // "org.hibernate" % "hibernate-entitymanager" % "5.1.0.Final" exclude("org.jboss", "jandex"),

  // Lucene Kuromoji
  "org.apache.lucene" % "lucene-analyzers-kuromoji" % "5.5.0",

  // H2 Database
  "com.h2database" % "h2" % "1.4.191",

  // Test
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

ScalaTestは、テストコード用です。

依存関係については、以前書いたコードと、JBeretのドキュメントを参考にしています。

Set up JBeret | JBeret User Guide

jBatch(JBeret)をJava SE環境で動かす - CLOVER

ItemReaderの実装

それでは、ItemReaderの実装から。
src/main/scala/org/littlewings/javaee7/batch/MyItemReader.scala

package org.littlewings.javaee7.batch

import java.io.{BufferedReader, Serializable}
import java.nio.charset.StandardCharsets
import java.nio.file.{Paths, Files}
import javax.batch.api.BatchProperty
import javax.batch.api.chunk.ItemReader
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import org.jboss.logging.Logger

@Dependent
@Named("MyItemReader")
class MyItemReader extends ItemReader {
  private val logger: Logger = Logger.getLogger(getClass)

  @Inject
  @BatchProperty
  private var filePath: String = _

  private var reader: BufferedReader = _

  override def open(checkpoint: Serializable): Unit = {
    logger.infof("Input File = %s.", filePath)
    reader = Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8)
  }

  override def readItem(): AnyRef = reader.readLine()

  override def checkpointInfo(): Serializable = {
    logger.infof("checkpoint.")
    null
  }

  override def close(): Unit = reader.close()
}

こちらは、指定されたファイルをオープンして、readItemメソッド中でBufferedReaderを使い、行単位に内容を返しているだけです。

nullを返すと、終了となるみたいですね。

ItemProcessor

形態素解析を行う、ItemProcessor。
src/main/scala/org/littlewings/javaee7/batch/MyItemProcessor.scala

package org.littlewings.javaee7.batch

import javax.batch.api.chunk.ItemProcessor
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.apache.lucene.analysis.Analyzer
import org.apache.lucene.analysis.ja.JapaneseAnalyzer
import org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.jboss.logging.Logger

import scala.collection.JavaConverters._

@Dependent
@Named("MyItemProcessor")
class MyItemProcessor extends ItemProcessor {
  private val logger: Logger = Logger.getLogger(getClass)

  private val analyzer: Analyzer = new JapaneseAnalyzer

  override def processItem(item: scala.Any): AnyRef = {
    logger.infof("process item.")

    val tokenStream = analyzer.tokenStream("", item.asInstanceOf[String])
    val charTermAttr = tokenStream.addAttribute(classOf[CharTermAttribute])
    val partOfSpeechAttr = tokenStream.addAttribute(classOf[PartOfSpeechAttribute])

    tokenStream.reset()

    try {
      Iterator
        .continually(tokenStream.incrementToken())
        .takeWhile(identity)
        .filter(_ => partOfSpeechAttr.getPartOfSpeech.contains("名詞"))
        .map(_ => charTermAttr.toString)
        .toList
    } finally {
      tokenStream.end()
      tokenStream.close()
    }
  }
}

Lucene KuromojiのJapaneseAnalyzerで形態素解析し、PartOfSpeechAttributeを使って名詞のみにフィルタをかけています。

ItemWriterとEntity

jBatchの要素で実装するものとしては、最後にItemWriter。

…の前に、データベース保存に使うJPAのEntityを先に。
src/main/scala/org/littlewings/javaee7/batch/Word.scala

package org.littlewings.javaee7.batch

import javax.persistence.{Column, Id, Entity, Table}

import scala.beans.BeanProperty

object Word {
  def apply(token: String, count: Int = 1): Word = {
    val w = new Word
    w.token = token
    w.count = count
    w
  }
}

@Entity
@Table(name = "word")
@SerialVersionUID(1L)
class Word extends Serializable {
  @Id
  @BeanProperty
  var token: String = _

  @Column(name = "count")
  var count: Int = _
}

単語と、出現回数を持つEntityとしました。objectおよびapplyを定義したのは、呼び出し元のnewを端折りたいだけの理由です…。

そして、ItemWriter。
src/main/scala/org/littlewings/javaee7/batch/MyItemWriter.scala

package org.littlewings.javaee7.batch

import java.io.Serializable
import javax.batch.api.chunk.ItemWriter
import javax.enterprise.context.Dependent
import javax.inject.Named
import javax.persistence.Persistence

import org.jboss.logging.Logger

import scala.collection.JavaConverters._
import scala.collection.mutable

@Dependent
@Named("MyItemWriter")
class MyItemWriter extends ItemWriter {
  private val logger: Logger = Logger.getLogger(getClass)

  private val store: mutable.Map[String, Word] =
    new mutable.HashMap().withDefault(t => Word(t, 0))

  override def open(checkpoint: Serializable): Unit =
    logger.infof("open writer.")

  override def writeItems(items: java.util.List[AnyRef]): Unit = {
    logger.infof(s"write items[${items.size}].")

    items
      .asScala
      .foreach {
        case tokens: Seq[_] => tokens.foreach {
          case token: String => store += (token -> Word(token, store(token).count + 1))
        }
      }
  }

  override def checkpointInfo(): Serializable = {
    logger.infof("checkpoint.")
    null
  }

  override def close(): Unit = {
    val emf = Persistence.createEntityManagerFactory("javaee7.batch.pu")
    val em = emf.createEntityManager

    val tx = em.getTransaction
    tx.begin()
    store.values.foreach(em.persist)
    tx.commit()

    em.close()
    emf.close()

    logger.infof("close.")
  }
}

writeItemsメソッド内で都度データベース保存してもよかったのですが、今回はいったん全部メモリ内に持たせ、最後のclose時にデータベースに反映することにしました。

Job XML

今回作った、各種クラスを使ったジョブを定義します。
src/main/resources/META-INF/batch-jobs/my-job.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="my-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step">
        <chunk item-count="3">
            <reader ref="MyItemReader">
                <properties>
                    <property name="filePath" value="#{jobParameters['filePath']}"/>
                </properties>
            </reader>
            <processor ref="MyItemProcessor"/>
            <writer ref="MyItemWriter"/>
        </chunk>
    </step>
</job>

ジョブのidは「my-job」、Chunkでのitem-countは今回は小さく3としてみました。これは、ログ出力で確認します。

プロパティとしては、ItemReaderで読ませるファイルパスを渡せるようにしています。

beans.xml/persistence.xml

CDIJPAを使っているので、それぞれの設定を。
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                           http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="annotated">
</beans>

src/main/resources/META-INF/persistence.xml

<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence
                                 http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd"
             version="2.1">
    <persistence-unit name="javaee7.batch.pu" transaction-type="RESOURCE_LOCAL">
        <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
        <properties>
            <property name="hibernate.dialect" value="org.hibernate.dialect.H2Dialect"/>
            <property name="javax.persistence.jdbc.driver" value="org.h2.Driver"/>
            <property name="javax.persistence.jdbc.url"
                      value="jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"/>
            <property name="hibernate.show_sql" value="false"/>
            <property name="hibernate.format_sql" value="false"/>
            <property name="hibernate.hbm2ddl.auto" value="update"/>
        </properties>
    </persistence-unit>
</persistence>

入力ファイル

読み込み対象となるファイルの一部。
bocchan.txt

親譲りの無鉄砲で小供の時から損ばかりしている。小学校に居る時分学校の二階から飛び降りて一週間ほど腰を抜かした事がある。なぜそんな無闇をしたと聞く人があるかも知れぬ。別段深い理由でもない。新築の二階から首を出していたら、同級生の一人が冗談に、いくら威張っても、そこから飛び降りる事は出来まい。弱虫やーい。と囃したからである。小使に負ぶさって帰って来た時、おやじが大きな眼をして二階ぐらいから飛び降りて腰を抜かす奴があるかと云ったから、この次は抜かさずに飛んで見せますと答えた。
親類のものから西洋製のナイフを貰って奇麗な刃を日に翳して、友達に見せていたら、一人が光る事は光るが切れそうもないと云った。切れぬ事があるか、何でも切ってみせると受け合った。そんなら君の指を切ってみろと注文したから、何だ指ぐらいこの通りだと右の手の親指の甲をはすに切り込んだ。幸ナイフが小さいのと、親指の骨が堅かったので、今だに親指は手に付いている。しかし創痕は死ぬまで消えぬ。
〜長いので省略〜

テストコード

それでは、最後に動作確認のためテストコードを書きます。
src/test/scala/org/littlewings/javaee7/batch/ChunkedBatchSpec.scala

package org.littlewings.javaee7.batch

import javax.persistence.Persistence

import org.jberet.se.Main
import org.scalatest.{FunSpec, Matchers}

class ChunkedBatchSpec extends FunSpec with Matchers {
  describe("Chunked jBatch") {
    it("process file word count, top 5.") {
      // JBeret run.
      Main.main(Array("my-job", "filePath=bocchan.txt"))

      val emf = Persistence.createEntityManagerFactory("javaee7.batch.pu")
      val em = emf.createEntityManager

      val query =
        em
          .createQuery("SELECT w FROM Word w ORDER BY w.count DESC, w.token ASC", classOf[Word])
          .setMaxResults(5)
      val results = query.getResultList

      results should have size (5)
      results.get(0).token should be ("おれ")
      results.get(0).count should be (471)
      results.get(1).token should be ("事")
      results.get(1).count should be (291)
      results.get(2).token should be ("人")
      results.get(2).count should be (213)
      results.get(3).token should be ("君")
      results.get(3).count should be (184)
      results.get(4).token should be ("赤")
      results.get(4).count should be (178)

      em.close()
      emf.close()
    }
  }
}

jBatchの起動には、jberet-seのMainクラスを使って簡略化しました。

また、「キー=値」の形式でプロパティを渡せるようなので、こちらでItemReaderで読み込むファイルパスを指定。

      // JBeret run.
      Main.main(Array("my-job", "filePath=bocchan.txt"))

https://github.com/jberet/jsr352/blob/1.2.0.Final/jberet-se/src/main/java/org/jberet/se/Main.java#L38

結果は、テストコードのとおりです。「おれ」が1番よく出てくるらしい…。

また、実行時のログはこんな感じになっていて、item-countを3に指定しているので、3レコード単位でItemWriter#writeItemsが呼び出されていることがわかりますね。

INFO: Input File = bocchan.txt.
3 03, 2016 9:30:34 午後 org.littlewings.javaee7.batch.MyItemWriter open
INFO: open writer.
3 03, 2016 9:30:34 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: process item.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: process item.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: process item.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items[3].
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo
INFO: checkpoint.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo
INFO: checkpoint.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: process item.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: process item.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: process item.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items[3].
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo
INFO: checkpoint.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo
INFO: checkpoint.

〜省略〜

3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: process item.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: process item.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem
INFO: process item.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items[3].
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo
INFO: checkpoint.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo
INFO: checkpoint.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo
INFO: checkpoint.
3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo
INFO: checkpoint.

3 03, 2016 9:30:37 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: close.

まとめ

はじめてのChunk方式のjBatchということで、簡単なWord Countのサンプルを書いてみました。

実際に書いている時は、jBatch以外の場所でハマり(CDI…Weld SEやJPAがうまく動かなかったり)…けっこう苦戦しましたが、最終的にはなんとかなりました。

まだ手探りなので、もう少し追っていこうかなと。

今回書いたコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-chunked