最近、こちらの本を読んでみまして、久しぶりにjBatchを触ってみたいなぁと思いまして。
Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築
- 作者: 寺田佳央,猪瀬淳,加藤田益嗣,羽生田恒永,梶浦美咲
- 出版社/メーカー: 翔泳社
- 発売日: 2015/12/22
- メディア: Kindle版
- この商品を含むブログ (6件) を見る
以前、少しだけ触っていたのですが、とりあえず軽くBatchletを試しただけで、そこで終わっていました。
せっかく本を読んで興味を持ったので、どこまでやるかは微妙ですが、もう少しやってみようかなと。
まずは、Chunk方式でjBatchを書いたことが使ったことがないので、こちらから書いてみようと思います。
Chunk方式でjBatchを使う時は、ItemReader、ItemProcessor、ItemWrierの3つを実装するようです。今回は、この3つを以下のお題で、実装してみようと思います。
- ファイルに保存した坊ちゃん(http://www.aozora.gr.jp/cards/000148/files/752_14964.html)の内容を、ItemReaderで読み込み
- ItemProcessorで形態素解析して、単語分割(名詞のみ抽出)
- ItemWriterで、データベース保存
テストコードでは、最後に題材にした文書中から登場頻度Top5の名詞を抜き出して確認してみたいと思います。
jBatchの実装にはJBeret、形態素解析にはLucene Kuromoji、データベース保存にはJPA(Hibernate)、H2 Databaseを使い、実行はJava SE環境で行います。
準備
まずは、ビルド定義から。
build.sbt
name := "jbatch-chunked" version := "0.0.1-SNAPSHOT" organization := "org.littlewings" scalaVersion := "2.11.7" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) libraryDependencies ++= Seq( // jBatch "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final", "org.jberet" % "jberet-se" % "1.2.0.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % "runtime", "org.jboss.weld.se" % "weld-se" % "2.3.3.Final" % "runtime", // "org.jboss" % "jandex" % "2.0.2.Final" % "runtime", "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % "runtime", "org.jboss.marshalling" % "jboss-marshalling" % "1.4.10.Final" % "runtime", // JPA "org.hibernate" % "hibernate-entitymanager" % "5.1.0.Final", // "org.hibernate" % "hibernate-entitymanager" % "5.1.0.Final" exclude("org.jboss", "jandex"), // Lucene Kuromoji "org.apache.lucene" % "lucene-analyzers-kuromoji" % "5.5.0", // H2 Database "com.h2database" % "h2" % "1.4.191", // Test "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
ScalaTestは、テストコード用です。
依存関係については、以前書いたコードと、JBeretのドキュメントを参考にしています。
ItemReaderの実装
それでは、ItemReaderの実装から。
src/main/scala/org/littlewings/javaee7/batch/MyItemReader.scala
package org.littlewings.javaee7.batch import java.io.{BufferedReader, Serializable} import java.nio.charset.StandardCharsets import java.nio.file.{Paths, Files} import javax.batch.api.BatchProperty import javax.batch.api.chunk.ItemReader import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import org.jboss.logging.Logger @Dependent @Named("MyItemReader") class MyItemReader extends ItemReader { private val logger: Logger = Logger.getLogger(getClass) @Inject @BatchProperty private var filePath: String = _ private var reader: BufferedReader = _ override def open(checkpoint: Serializable): Unit = { logger.infof("Input File = %s.", filePath) reader = Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8) } override def readItem(): AnyRef = reader.readLine() override def checkpointInfo(): Serializable = { logger.infof("checkpoint.") null } override def close(): Unit = reader.close() }
こちらは、指定されたファイルをオープンして、readItemメソッド中でBufferedReaderを使い、行単位に内容を返しているだけです。
nullを返すと、終了となるみたいですね。
ItemProcessor
形態素解析を行う、ItemProcessor。
src/main/scala/org/littlewings/javaee7/batch/MyItemProcessor.scala
package org.littlewings.javaee7.batch import javax.batch.api.chunk.ItemProcessor import javax.enterprise.context.Dependent import javax.inject.Named import org.apache.lucene.analysis.Analyzer import org.apache.lucene.analysis.ja.JapaneseAnalyzer import org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.jboss.logging.Logger import scala.collection.JavaConverters._ @Dependent @Named("MyItemProcessor") class MyItemProcessor extends ItemProcessor { private val logger: Logger = Logger.getLogger(getClass) private val analyzer: Analyzer = new JapaneseAnalyzer override def processItem(item: scala.Any): AnyRef = { logger.infof("process item.") val tokenStream = analyzer.tokenStream("", item.asInstanceOf[String]) val charTermAttr = tokenStream.addAttribute(classOf[CharTermAttribute]) val partOfSpeechAttr = tokenStream.addAttribute(classOf[PartOfSpeechAttribute]) tokenStream.reset() try { Iterator .continually(tokenStream.incrementToken()) .takeWhile(identity) .filter(_ => partOfSpeechAttr.getPartOfSpeech.contains("名詞")) .map(_ => charTermAttr.toString) .toList } finally { tokenStream.end() tokenStream.close() } } }
Lucene KuromojiのJapaneseAnalyzerで形態素解析し、PartOfSpeechAttributeを使って名詞のみにフィルタをかけています。
ItemWriterとEntity
jBatchの要素で実装するものとしては、最後にItemWriter。
…の前に、データベース保存に使うJPAのEntityを先に。
src/main/scala/org/littlewings/javaee7/batch/Word.scala
package org.littlewings.javaee7.batch import javax.persistence.{Column, Id, Entity, Table} import scala.beans.BeanProperty object Word { def apply(token: String, count: Int = 1): Word = { val w = new Word w.token = token w.count = count w } } @Entity @Table(name = "word") @SerialVersionUID(1L) class Word extends Serializable { @Id @BeanProperty var token: String = _ @Column(name = "count") var count: Int = _ }
単語と、出現回数を持つEntityとしました。objectおよびapplyを定義したのは、呼び出し元のnewを端折りたいだけの理由です…。
そして、ItemWriter。
src/main/scala/org/littlewings/javaee7/batch/MyItemWriter.scala
package org.littlewings.javaee7.batch import java.io.Serializable import javax.batch.api.chunk.ItemWriter import javax.enterprise.context.Dependent import javax.inject.Named import javax.persistence.Persistence import org.jboss.logging.Logger import scala.collection.JavaConverters._ import scala.collection.mutable @Dependent @Named("MyItemWriter") class MyItemWriter extends ItemWriter { private val logger: Logger = Logger.getLogger(getClass) private val store: mutable.Map[String, Word] = new mutable.HashMap().withDefault(t => Word(t, 0)) override def open(checkpoint: Serializable): Unit = logger.infof("open writer.") override def writeItems(items: java.util.List[AnyRef]): Unit = { logger.infof(s"write items[${items.size}].") items .asScala .foreach { case tokens: Seq[_] => tokens.foreach { case token: String => store += (token -> Word(token, store(token).count + 1)) } } } override def checkpointInfo(): Serializable = { logger.infof("checkpoint.") null } override def close(): Unit = { val emf = Persistence.createEntityManagerFactory("javaee7.batch.pu") val em = emf.createEntityManager val tx = em.getTransaction tx.begin() store.values.foreach(em.persist) tx.commit() em.close() emf.close() logger.infof("close.") } }
writeItemsメソッド内で都度データベース保存してもよかったのですが、今回はいったん全部メモリ内に持たせ、最後のclose時にデータベースに反映することにしました。
Job XML
今回作った、各種クラスを使ったジョブを定義します。
src/main/resources/META-INF/batch-jobs/my-job.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="my-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="step"> <chunk item-count="3"> <reader ref="MyItemReader"> <properties> <property name="filePath" value="#{jobParameters['filePath']}"/> </properties> </reader> <processor ref="MyItemProcessor"/> <writer ref="MyItemWriter"/> </chunk> </step> </job>
ジョブのidは「my-job」、Chunkでのitem-countは今回は小さく3としてみました。これは、ログ出力で確認します。
プロパティとしては、ItemReaderで読ませるファイルパスを渡せるようにしています。
beans.xml/persistence.xml
CDIとJPAを使っているので、それぞれの設定を。
src/main/resources/META-INF/beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" bean-discovery-mode="annotated"> </beans>
src/main/resources/META-INF/persistence.xml
<?xml version="1.0" encoding="UTF-8"?> <persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd" version="2.1"> <persistence-unit name="javaee7.batch.pu" transaction-type="RESOURCE_LOCAL"> <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider> <properties> <property name="hibernate.dialect" value="org.hibernate.dialect.H2Dialect"/> <property name="javax.persistence.jdbc.driver" value="org.h2.Driver"/> <property name="javax.persistence.jdbc.url" value="jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"/> <property name="hibernate.show_sql" value="false"/> <property name="hibernate.format_sql" value="false"/> <property name="hibernate.hbm2ddl.auto" value="update"/> </properties> </persistence-unit> </persistence>
入力ファイル
読み込み対象となるファイルの一部。
bocchan.txt
親譲りの無鉄砲で小供の時から損ばかりしている。小学校に居る時分学校の二階から飛び降りて一週間ほど腰を抜かした事がある。なぜそんな無闇をしたと聞く人があるかも知れぬ。別段深い理由でもない。新築の二階から首を出していたら、同級生の一人が冗談に、いくら威張っても、そこから飛び降りる事は出来まい。弱虫やーい。と囃したからである。小使に負ぶさって帰って来た時、おやじが大きな眼をして二階ぐらいから飛び降りて腰を抜かす奴があるかと云ったから、この次は抜かさずに飛んで見せますと答えた。 親類のものから西洋製のナイフを貰って奇麗な刃を日に翳して、友達に見せていたら、一人が光る事は光るが切れそうもないと云った。切れぬ事があるか、何でも切ってみせると受け合った。そんなら君の指を切ってみろと注文したから、何だ指ぐらいこの通りだと右の手の親指の甲をはすに切り込んだ。幸ナイフが小さいのと、親指の骨が堅かったので、今だに親指は手に付いている。しかし創痕は死ぬまで消えぬ。 〜長いので省略〜
テストコード
それでは、最後に動作確認のためテストコードを書きます。
src/test/scala/org/littlewings/javaee7/batch/ChunkedBatchSpec.scala
package org.littlewings.javaee7.batch import javax.persistence.Persistence import org.jberet.se.Main import org.scalatest.{FunSpec, Matchers} class ChunkedBatchSpec extends FunSpec with Matchers { describe("Chunked jBatch") { it("process file word count, top 5.") { // JBeret run. Main.main(Array("my-job", "filePath=bocchan.txt")) val emf = Persistence.createEntityManagerFactory("javaee7.batch.pu") val em = emf.createEntityManager val query = em .createQuery("SELECT w FROM Word w ORDER BY w.count DESC, w.token ASC", classOf[Word]) .setMaxResults(5) val results = query.getResultList results should have size (5) results.get(0).token should be ("おれ") results.get(0).count should be (471) results.get(1).token should be ("事") results.get(1).count should be (291) results.get(2).token should be ("人") results.get(2).count should be (213) results.get(3).token should be ("君") results.get(3).count should be (184) results.get(4).token should be ("赤") results.get(4).count should be (178) em.close() emf.close() } } }
jBatchの起動には、jberet-seのMainクラスを使って簡略化しました。
また、「キー=値」の形式でプロパティを渡せるようなので、こちらでItemReaderで読み込むファイルパスを指定。
// JBeret run. Main.main(Array("my-job", "filePath=bocchan.txt"))
結果は、テストコードのとおりです。「おれ」が1番よく出てくるらしい…。
また、実行時のログはこんな感じになっていて、item-countを3に指定しているので、3レコード単位でItemWriter#writeItemsが呼び出されていることがわかりますね。
INFO: Input File = bocchan.txt. 3 03, 2016 9:30:34 午後 org.littlewings.javaee7.batch.MyItemWriter open INFO: open writer. 3 03, 2016 9:30:34 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: process item. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: process item. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: process item. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items[3]. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo INFO: checkpoint. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo INFO: checkpoint. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: process item. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: process item. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: process item. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items[3]. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo INFO: checkpoint. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo INFO: checkpoint. 〜省略〜 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: process item. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: process item. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemProcessor processItem INFO: process item. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items[3]. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo INFO: checkpoint. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo INFO: checkpoint. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemReader checkpointInfo INFO: checkpoint. 3 03, 2016 9:30:35 午後 org.littlewings.javaee7.batch.MyItemWriter checkpointInfo INFO: checkpoint. 3 03, 2016 9:30:37 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: close.
まとめ
はじめてのChunk方式のjBatchということで、簡単なWord Countのサンプルを書いてみました。
実際に書いている時は、jBatch以外の場所でハマり(CDI…Weld SEやJPAがうまく動かなかったり)…けっこう苦戦しましたが、最終的にはなんとかなりました。
まだ手探りなので、もう少し追っていこうかなと。
今回書いたコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-chunked