jBatchの実装であるJBeretでは、Job Repositoryにいくつかのデータストアを選択することができます。
実装としては、これだけが用意されています。
- In Memory(デフォルト)
- JDBC
- MongoDB
- Infinispan(Embedded Mode)
書いているとおり、デフォルトのJob RepositoryはIn Memoryです。
Job Repositoryの切り替え方法については、ドキュメントのSet upのところにさらっと書いてあります。
「Configure JBeret Batch Runtime」のところを見るとよいでしょう。
WildFlyやJBoss EAP上で動かす時と、Java SE(JBeret SE)上で動かす時で設定方法が違うみたいなのですが、今回は
Java SE上で動かしてみたいと思います。
お題は、次のように設定します。
- ItemReader/ItemWriter、およびBatchletをそれぞれ使ってステップを2つ使ったジョブを構成する
- 各ステップは、パラメーターで任意に失敗可能とする(あとで再開させる)
- ItemReaderには、Checkpointを使用する
- Job RepositoryはJDBC(MySQL)とする
では、試してみましょう。
準備
このお題でのプロジェクトの依存関係としては、このあたりがあればOKです。
libraryDependencies ++= Seq( // JBeret SE "org.jberet" % "jberet-se" % "1.2.3.Final" % Compile, "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final" % Compile, "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % Runtime, "javax.enterprise" % "cdi-api" % "1.1" % Compile, "javax.inject" % "javax.inject" % "1" % Compile, "org.jboss.weld.se" % "weld-se" % "2.4.4.Final" % Runtime, "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % Runtime, "org.jboss.marshalling" % "jboss-marshalling" % "1.4.11.Final" % Runtime, "org.jboss.logging" % "jboss-logging" % "3.3.1.Final" % Compile, "org.jboss" % "jandex" % "2.0.3.Final" % Runtime, "com.google.guava" % "guava" % "18.0" % Runtime, // JDBC Driver for Job Repository "mysql" % "mysql-connector-java" % "5.1.42" % Runtime, // for Test "org.scalatest" %% "scalatest" % "3.0.3" % Test )
JBeretへの依存関係は、Set upのドキュメントに書いてあるので、そちらを参考にしましょう。
MySQLが対象となるのでJDBCドライバと、あとテストコード用にScalaTestを入れています。
MySQLの準備は、とりあえずアクセス可能で、テーブル作成、参照などの権限を持ったユーザーがあれば大丈夫です。
サンプルコード
まずは、サンプルコードを用意します。
最初はItemReader。
src/main/scala/org/littlewings/javaee7/batch/MyItemReader.scala
package org.littlewings.javaee7.batch import javax.batch.api.BatchProperty import javax.batch.api.chunk.AbstractItemReader import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import org.jboss.logging.Logger @Named @Dependent class MyItemReader extends AbstractItemReader { private[this] val logger: Logger = Logger.getLogger(getClass) private[this] var items: Iterator[Int] = _ private[this] var checkpoint: MyCheckPoint = _ @Inject @BatchProperty private[this] var readerFail: Boolean = _ @Inject @BatchProperty private[this] var readerFailCount: Int = _ override def open(checkpoint: java.io.Serializable): Unit = { checkpoint match { case null => this.checkpoint = new MyCheckPoint case cp: MyCheckPoint => this.checkpoint = cp } logger.infof("item reader start, from %d", this.checkpoint.counter) items = (this.checkpoint.counter until 100).iterator } override def readItem(): AnyRef = { if (readerFail && checkpoint.counter == readerFailCount) { throw new RuntimeException(s"Oops!! ItemReader, count = ${checkpoint.counter}") } checkpoint.counter += 1 if (checkpoint.counter % 25 == 0) { logger.infof("read count = %d", checkpoint.counter) } if (items.hasNext) { Integer.valueOf(items.next()) } else { null } } override def close(): Unit = logger.info("item reader end") override def checkpointInfo(): java.io.Serializable = checkpoint }
ItemReaderが扱うレコードとしては、単にRange/Iteratorで生成したIntegerを使用します。
items = (this.checkpoint.counter until 100).iterator
また、@BatchPropertyを設けて任意のポイントでエラーとできるようにしました。
@Inject @BatchProperty private[this] var readerFail: Boolean = _ @Inject @BatchProperty private[this] var readerFailCount: Int = _
if (readerFail && checkpoint.counter == readerFailCount){ throw new RuntimeException(s"Oops!! ItemReader, count = ${checkpoint.counter}") }
CheckPointは今回はItemReaderでしか使用していませんが、現在の位置を保持しておくだけにします。
src/main/scala/org/littlewings/javaee7/batch/MyCheckPoint.scala package org.littlewings.javaee7.batch @SerialVersionUID(1L) class MyCheckPoint extends Serializable { var counter: Int = _ }
ItemWriterは受け取ったレコード件数を、一定件数でログ出力するだけにします。
src/main/scala/org/littlewings/javaee7/batch/MyItemWriter.scala
package org.littlewings.javaee7.batch import javax.batch.api.chunk.AbstractItemWriter import javax.enterprise.context.Dependent import javax.inject.Named import org.jboss.logging.Logger @Named @Dependent class MyItemWriter extends AbstractItemWriter { private[this] val logger: Logger = Logger.getLogger(getClass) private[this] var counter: Int = _ override def open(checkpoint: java.io.Serializable): Unit = logger.info("item writer start") override def writeItems(items: java.util.List[AnyRef]): Unit = { logger.infof("write items, size = %d", items.size) counter += items.size() } override def close(): Unit = { logger.infof("total count = %d", counter) logger.info("item writer end") } }
Batchletは、起動したらログ出力するかどうかだけですが、設定でエラーにできるようにしておきました。
src/main/scala/org/littlewings/javaee7/batch/MyBatchlet.scala
package org.littlewings.javaee7.batch import javax.batch.api.{AbstractBatchlet, BatchProperty} import javax.enterprise.context.Dependent import javax.inject.{Inject, Named} import org.jboss.logging.Logger @Named @Dependent class MyBatchlet extends AbstractBatchlet { private[this] val logger: Logger = Logger.getLogger(getClass) @Inject @BatchProperty private[this] var batchletFail: Boolean = _ override def process(): String = { logger.info("process batchlet") if (batchletFail) { throw new RuntimeException("Oops!! Batchlet") } "Batchlet Success" } }
続いて、設定ファイル系の準備です。
ジョブ定義ファイル。ItemReader/ItemWriterのステップと、Batchletのステップを分けて登録します。ItemReaderとBatchletには、
プロパティも設定可能にしておきます。チャンクサイズは、25としました。
src/main/resources/META-INF/batch-jobs/my-job.xml
<?xml version="1.0" encoding="UTF-8"?> <job id="my-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="chunk-step" next="batchlet-step"> <chunk item-count="25"> <reader ref="myItemReader"> <properties> <property name="readerFail" value="#{jobParameters['readerFail']}"/> <property name="readerFailCount" value="#{jobParameters['readerFailCount']}"/> </properties> </reader> <writer ref="myItemWriter"/> </chunk> </step> <step id="batchlet-step"> <batchlet ref="myBatchlet"> <properties> <property name="batchletFail" value="#{jobParameters['batchletFail']}"/> </properties> </batchlet> </step> </job>
では、JBeretのJob Repositoryの設定をしましょう。
JBeret SEの場合は、「jberet.properties」というファイルを用意します。WildFlyまたはJBoss EAPの場合は、サブシステムの
設定として行うようです。
src/main/resources/jberet.properties
job-repository-type = jdbc db-url = jdbc:mysql://172.17.0.2:3306/practice db-user = kazuhira db-password = password db-properties = useUnicode=true:characterEncoding=utf-8:characterSetResults=utf-8:useServerPrepStmts=true:useLocalSessionState=true:elideSetAutoCommits=true:alwaysSendSetIsolation=false:useSSL=false #ddl-file = sql/jberet-mysql.ddl
Job RepositoryをJDBCにするには、「job-repository-type」を「jdbc」に設定します。
job-repository-type = jdbc
この「jberet.properties」を見ているのは、こちらのクラスです。
https://github.com/jberet/jsr352/blob/1.2.3.Final/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java
Set upにはちょこっとしか内容が書かれていないので、
詳しく見たかったらソースコードを見ることになります…。「db-properties」の書き方とか、ソースコード見ないと
わかりませんでした。
https://github.com/jberet/jsr352/blob/1.2.3.Final/jberet-core/src/main/java/org/jberet/repository/JdbcRepository.java
その他のJob Repositoryの実装も置かれているので、気になる場合は見てみるとよいでしょう。
https://github.com/jberet/jsr352/tree/1.2.3.Final/jberet-core/src/main/java/org/jberet/repository
また、ドキュメントに記載のとおり、テーブルを4つ作る必要があります。自分で書いてもいいのですが(その場合は、
「ddl-file」にDDLが書かれたファイルを指定します)、今回はJBeret自身に作ってもらいます。
以下のRDBMSに対するDDLは、JBeretが用意しています。
実行時の情報から、DDLを選んで実行してくれます、と。
MySQLの場合は、こちらが使用されます。
https://github.com/jberet/jsr352/blob/1.2.3.Final/jberet-core/src/main/resources/sql/jberet-mysql.ddl
その他のRDBMSへのDDLも置かれているので、興味のある方は見てみるとよいでしょう。
https://github.com/jberet/jsr352/tree/1.2.3.Final/jberet-core/src/main/resources/sql
JBeretが使用するSQLは、こちらに設定されています。
https://github.com/jberet/jsr352/blob/1.2.3.Final/jberet-core/src/main/resources/sql/jberet-sql.properties
最後に、CDIを使うのでbeans.xmlを用意します。
src/main/resources/META-INF/beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" bean-discovery-mode="annotated"> </beans>
テストコードと動作確認
確認のためのテストコードを用意して、バッチを実行してみましょう。
テストコードの雛形は、こちら。
src/test/scala/org/littlewings/javaee7/batch/BatchJdbcJobRepositorySpec.scala
package org.littlewings.javaee7.batch import java.util.Properties import java.util.concurrent.TimeUnit import javax.batch.runtime.{BatchRuntime, BatchStatus} import org.jberet.runtime.JobExecutionImpl import org.jboss.logging.Logger import org.scalatest.{FunSuite, Matchers} class BatchJdbcJobRepositorySpec extends FunSuite with Matchers { val logger: Logger = Logger.getLogger(getClass) // ここに、テストを書く! }
ここに、正常終了させたり、特定のステップでエラーにさせるテストを書いていきます。
正常終了するケースは、こちら。
test("success case") { logger.info("[success case] start") val jobOperator = BatchRuntime.getJobOperator val properties = new Properties properties.put("readerFail", "false") properties.put("readerFailCount", "0") properties.put("batchletFail", "false") val executionId = jobOperator.start("my-job", properties) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.COMPLETED) logger.infof("[success case] end, execution-id = %d", executionId) }
では、試す…前にMySQLの状態を見てみます。該当のデータベースには、まだテーブルはありません。
mysql> show tables; Empty set (0.00 sec)
実行してみます。
> test
こんな感じのログが出力されます。
6 17, 2017 1:12:24 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$1 INFO: [success case] start 6 17, 2017 1:12:24 午後 org.jberet.repository.JdbcRepository getDDLLocation INFO: JBERET000021: About to initialize batch job repository with ddl-file: sql/jberet-mysql.ddl for database MySQL 6 17, 2017 1:12:25 午後 org.jboss.weld.bootstrap.WeldStartup <clinit> INFO: WELD-000900: 2.4.4 (Final) 6 17, 2017 1:12:25 午後 org.jboss.weld.environment.deployment.discovery.DiscoveryStrategyFactory create INFO: WELD-ENV-000020: Using jandex for bean discovery 6 17, 2017 1:12:25 午後 org.jboss.weld.bootstrap.WeldStartup startContainer INFO: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously. 6 17, 2017 1:12:25 午後 org.jboss.weld.environment.se.WeldContainer fireContainerInitializedEvent INFO: WELD-ENV-002003: Weld SE container STATIC_INSTANCE initialized 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: item reader start, from 0 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open INFO: item writer start 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 50 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 75 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 100 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: total count = 100 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: item writer end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close INFO: item reader end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: process batchlet 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$1 INFO: [success case] end, execution-id = 1
よく見ると、DDLを選んだみたいな感じのログが出力されています。
6 17, 2017 1:12:24 午後 org.jberet.repository.JdbcRepository getDDLLocation INFO: JBERET000021: About to initialize batch job repository with ddl-file: sql/jberet-mysql.ddl for database MySQL
MySQLの方を見てみると、テーブルができていたりします。
mysql> show tables; +---------------------+ | Tables_in_practice | +---------------------+ | JOB_EXECUTION | | JOB_INSTANCE | | PARTITION_EXECUTION | | STEP_EXECUTION | +---------------------+ 4 rows in set (0.01 sec)
内容を見ると、それっぽい情報が入っています。
mysql> select * from JOB_EXECUTION where JOBEXECUTIONID = 1; +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ | JOBEXECUTIONID | JOBINSTANCEID | VERSION | CREATETIME | STARTTIME | ENDTIME | LASTUPDATEDTIME | BATCHSTATUS | EXITSTATUS | JOBPARAMETERS | RESTARTPOSITION | +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ | 1 | 1 | NULL | 2017-06-17 13:12:25 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED | COMPLETED | readerFail = false batchletFail = false readerFailCount = 0 | NULL | +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ 1 row in set (0.00 sec) mysql> select * from JOB_INSTANCE; +---------------+---------+---------+-----------------+ | JOBINSTANCEID | VERSION | JOBNAME | APPLICATIONNAME | +---------------+---------+---------+-----------------+ | 1 | NULL | my-job | NULL | +---------------+---------+---------+-----------------+ 1 rows in set (0.00 sec) mysql> select * from PARTITION_EXECUTION; Empty set (0.00 sec) mysql> select * from STEP_EXECUTION where JOBEXECUTIONID = 1; +-----------------+----------------+---------+---------------+---------------------+---------------------+-------------+------------------+--------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+-------------------------------------------------------------------------------+----------------------+ | STEPEXECUTIONID | JOBEXECUTIONID | VERSION | STEPNAME | STARTTIME | ENDTIME | BATCHSTATUS | EXITSTATUS | EXECUTIONEXCEPTION | PERSISTENTUSERDATA | READCOUNT | WRITECOUNT | COMMITCOUNT | ROLLBACKCOUNT | READSKIPCOUNT | PROCESSSKIPCOUNT | FILTERCOUNT | WRITESKIPCOUNT | READERCHECKPOINTINFO | WRITERCHECKPOINTINFO | +-----------------+----------------+---------+---------------+---------------------+---------------------+-------------+------------------+--------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+-------------------------------------------------------------------------------+----------------------+ | 1 | 1 | NULL | chunk-step | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED | COMPLETED | NULL | NULL | 100 | 100 | 5 | 0 | 0 | 0 | 0 | 0 | �� sr *org.littlewings.javaee7.batch.MyCheckPoint ^A^B ^AI counterxp e | NULL | | 2 | 1 | NULL | batchlet-step | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED | Batchlet Success | NULL | NULL | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | NULL | NULL | +-----------------+----------------+---------+---------------+---------------------+---------------------+-------------+------------------+--------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+-------------------------------------------------------------------------------+----------------------+ 2 rows in set (0.00 sec)
STEP_EXECUTIONテーブルには、ステップの情報やシリアライズされたデータっぽいものも入っていますね。
PARTITION_EXECUTIONテーブルについては、今回パーティションを使っていないからか、なにも入っていません。
続いて、ItemReader/ItemWriterのステップで異常終了させるケース。
test("reader fail and restart") { logger.info("[reader fail and restart] start") val jobOperator = BatchRuntime.getJobOperator // first fail val p1 = new Properties p1.put("readerFail", "true") p1.put("readerFailCount", "45") p1.put("batchletFail", "false") val executionId = jobOperator.start("my-job", p1) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.FAILED) logger.info("first job end.") // retry val p2 = new Properties p2.put("readerFail", "false") p2.put("readerFailCount", "0") p2.put("batchletFail", "false") val restartExecutionId = jobOperator.restart(executionId, p2) executionId should not be (restartExecutionId) val restartedJobExecution = jobOperator.getJobExecution(restartExecutionId) restartedJobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS) restartedJobExecution.getBatchStatus should be(BatchStatus.COMPLETED) logger.info("restart job end") logger.infof("[reader fail and restart] end, execution-id first = %d, restarted = %d", executionId, restartExecutionId) }
ジョブの再開には、前回のExecution Idを使ってJobOperator#restartを呼び出します。
val restartExecutionId = jobOperator.restart(executionId, p2)
つまり、前回の実行情報が必要です、と。
ちなみに、1回目のジョブは45レコード読んだところでコケてもらいます。
// first fail val p1 = new Properties p1.put("readerFail", "true") p1.put("readerFailCount", "45") p1.put("batchletFail", "false") val executionId = jobOperator.start("my-job", p1) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.FAILED)
では、実行してみます。
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2 INFO: [reader fail and restart] start 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: item reader start, from 0 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open INFO: item writer start 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner readProcessWriteItems ERROR: ProcessingInfo{count=20, timerExpired=false, itemState=RUNNING, chunkState=RUNNING, checkpointPosition=24, readPosition=45, failurePoint=null} 6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner run ERROR: item-count=25, time-limit=0, skip-limit=-1, skipCount=0, retry-limit=-1, retryCount=0 6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner run ERROR: JBERET000007: Failed to run job my-job, chunk-step, org.jberet.job.model.Chunk@293f8c7c java.lang.RuntimeException: Oops!! ItemReader, count = 45 at org.littlewings.javaee7.batch.MyItemReader.readItem(MyItemReader.scala:41) at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:359) at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:305) at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:201) at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:226) at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147) at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60) at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: total count = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: item writer end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close INFO: item reader end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2 INFO: first job end. 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: item reader start, from 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open INFO: item writer start 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 50 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 75 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 100 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: total count = 75 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: item writer end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close INFO: item reader end 6 17, 2017 1:12:26 午後 org.jberet.runtime.context.StepContextImpl <init> WARN: JBERET000018: Could not find the original step execution to restart. Current step execution id: 0, step name: batchlet-step 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: process batchlet 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2 INFO: restart job end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2 INFO: [reader fail and restart] end, execution-id first = 2, restarted = 3
45レコード読んだ後に、エラーになります。
6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner readProcessWriteItems ERROR: ProcessingInfo{count=20, timerExpired=false, itemState=RUNNING, chunkState=RUNNING, checkpointPosition=24, readPosition=45, failurePoint=null} 6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner run ERROR: item-count=25, time-limit=0, skip-limit=-1, skipCount=0, retry-limit=-1, retryCount=0 6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner run ERROR: JBERET000007: Failed to run job my-job, chunk-step, org.jberet.job.model.Chunk@293f8c7c java.lang.RuntimeException: Oops!! ItemReader, count = 45
このあと、1度そこまでの処理が終了した後で
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: total count = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: item writer end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close INFO: item reader end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2 INFO: first job end.
25から(2つ目のチャンクから)再開します。
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: item reader start, from 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open INFO: item writer start 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 50
再開後は正常終了して、初回実行時と再実行時では異なるExecution Idが得られることがわかります。
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2 INFO: [reader fail and restart] end, execution-id first = 2, restarted = 3
実行後の情報。
mysql> select * from JOB_EXECUTION where JOBEXECUTIONID IN (2, 3); +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ | JOBEXECUTIONID | JOBINSTANCEID | VERSION | CREATETIME | STARTTIME | ENDTIME | LASTUPDATEDTIME | BATCHSTATUS | EXITSTATUS | JOBPARAMETERS | RESTARTPOSITION | +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ | 2 | 2 | NULL | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | FAILED | FAILED | readerFail = true batchletFail = false readerFailCount = 45 | NULL | | 3 | 2 | NULL | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED | COMPLETED | readerFail = false batchletFail = false readerFailCount = 0 | NULL | +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ 2 rows in set (0.00 sec) mysql> select * from JOB_EXECUTION where JOBEXECUTIONID IN (2, 3); +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ | JOBEXECUTIONID | JOBINSTANCEID | VERSION | CREATETIME | STARTTIME | ENDTIME | LASTUPDATEDTIME | BATCHSTATUS | EXITSTATUS | JOBPARAMETERS | RESTARTPOSITION | +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ | 2 | 2 | NULL | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | FAILED | FAILED | readerFail = true batchletFail = false readerFailCount = 45 | NULL | | 3 | 2 | NULL | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED | COMPLETED | readerFail = false batchletFail = false readerFailCount = 0 | NULL | +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ 2 rows in set (0.00 sec) mysql> select * from JOB_INSTANCE; +---------------+---------+---------+-----------------+ | JOBINSTANCEID | VERSION | JOBNAME | APPLICATIONNAME | +---------------+---------+---------+-----------------+ | 1 | NULL | my-job | NULL | | 2 | NULL | my-job | NULL | +---------------+---------+---------+-----------------+ 2 rows in set (0.00 sec) mysql> select * from STEP_EXECUTION where| STEPEXECUTIONID | JOBEXECUTIONID | VERSION | STEPNAME | STARTTIME | ENDTIME | BATCHSTATUS | EXITSTATUS | EXECUTIONEXCEPTION | PERSISTENTUSERDATA | READCOUNT | WRITECOUNT | COMMITCOUNT | ROLLBACKCOUNT | READSKIPCOUNT | PROCESSSKIPCOUNT | FILTERCOUNT | WRITESKIPCOUNT | READERCHECKPOINTINFO | WRITERCHECKPOINTINFO || 3 | 2 | NULL | chunk-step | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | FAILED | FAILED | java.lang.RuntimeException: Oops!! ItemReader, count = 45 at org.littlewings.javaee7.batch.MyItemReader.readItem(MyItemReader.scala:41) at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:359) at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:305) at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:201) at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:226) at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147) at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60) at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) | NULL | 45 | 25 | 1 | 1 | 0 | 0 | 0 | 0 | �� sr *org.littlewings.javaee7.batch.MyCheckPoint ^A^B ^AI counterxp ^Y | NULL | | 4 | 3 | NULL | chunk-step | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED | COMPLETED | NULL | NULL | 75 | 75 | 4 | 0 | 0 | 0 | 0 | 0 | �� sr *org.littlewings.javaee7.batch.MyCheckPoint ^A^B ^AI counterxp e | NULL | | 5 | 3 | NULL | batchlet-step | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED | Batchlet Success | NULL | NULL | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | NULL | NULL |rows in set (0.00 sec)
STEP_EXECUTIONには、スタックトレースまで入っています。
ちなみに、再開時にはJOBEXECUTIONIDは変わりますが、JOBINSTANCEIDは同じものになるみたいですね。
最後は、Batchletでエラーになってもらうパターン。
test("batchlet fail and restart") { logger.info("[batchlet fail and restart] start") val jobOperator = BatchRuntime.getJobOperator // first fail val p1 = new Properties p1.put("readerFail", "false") p1.put("readerFailCount", "0") p1.put("batchletFail", "true") val executionId = jobOperator.start("my-job", p1) val jobExecution = jobOperator.getJobExecution(executionId) jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS) jobExecution.getBatchStatus should be(BatchStatus.FAILED) logger.info("first job end.") // retry val p2 = new Properties p2.put("readerFail", "false") p2.put("readerFailCount", "0") p2.put("batchletFail", "false") val restartedExecutionId = jobOperator.restart(executionId, p2) executionId should not be (restartedExecutionId) val restartedJobExecution = jobOperator.getJobExecution(restartedExecutionId) restartedJobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS) restartedJobExecution.getBatchStatus should be(BatchStatus.COMPLETED) logger.info("restart job end") logger.infof("[batchlet fail and restart] end, execution-id first = %d, restarted = %d", executionId, restartedExecutionId) }
実行時のログは、こんな感じ。
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$3 INFO: [batchlet fail and restart] start 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open INFO: item reader start, from 0 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open INFO: item writer start 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 50 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 75 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem INFO: read count = 100 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems INFO: write items, size = 25 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: total count = 100 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close INFO: item writer end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close INFO: item reader end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: process batchlet 6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.BatchletRunner run WARN: JBERET000001: Failed to run batchlet org.jberet.job.model.RefArtifact@698cf36c java.lang.RuntimeException: Oops!! Batchlet at org.littlewings.javaee7.batch.MyBatchlet.process(MyBatchlet.scala:22) at org.jberet.runtime.runner.BatchletRunner.run(BatchletRunner.java:72) at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:229) at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147) at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:128) at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:203) at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60) at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$3 INFO: first job end. 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyBatchlet process INFO: process batchlet 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$3 INFO: restart job end 6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$3 INFO: [batchlet fail and restart] end, execution-id first = 4, restarted = 5
この場合、ItemReader/ItemWriterのステップは終了しているので、Batchletのステップからの再開となります。
一応、テーブルの情報も載せておきます。
mysql> select * from JOB_EXECUTION where JOBEXECUTIONID IN (4, 5); +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ | JOBEXECUTIONID | JOBINSTANCEID | VERSION | CREATETIME | STARTTIME | ENDTIME | LASTUPDATEDTIME | BATCHSTATUS | EXITSTATUS | JOBPARAMETERS | RESTARTPOSITION | +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ | 4 | 3 | NULL | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | FAILED | FAILED | readerFail = false batchletFail = true readerFailCount = 0 | NULL | | 5 | 3 | NULL | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | COMPLETED | COMPLETED | readerFail = false batchletFail = false readerFailCount = 0 | NULL | +----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+ 2 rows in set (0.00 sec) mysql> select * from JOB_INSTANCE; +---------------+---------+---------+-----------------+ | JOBINSTANCEID | VERSION | JOBNAME | APPLICATIONNAME | +---------------+---------+---------+-----------------+ | 1 | NULL | my-job | NULL | | 2 | NULL | my-job | NULL | | 3 | NULL | my-job | NULL | +---------------+---------+---------+-----------------+ 3 rows in set (0.00 sec) mysql> select * from STEP_EXECUTION where JOBEXECUTIONID IN (4, 5); +-----------------+----------------+---------+---------------+---------------------+---------------------+-------------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+-------------------------------------------------------------------------------+----------------------+ | STEPEXECUTIONID | JOBEXECUTIONID | VERSION | STEPNAME | STARTTIME | ENDTIME | BATCHSTATUS | EXITSTATUS | EXECUTIONEXCEPTION | PERSISTENTUSERDATA | READCOUNT | WRITECOUNT | COMMITCOUNT | ROLLBACKCOUNT | READSKIPCOUNT | PROCESSSKIPCOUNT | FILTERCOUNT | WRITESKIPCOUNT | READERCHECKPOINTINFO | WRITERCHECKPOINTINFO || 6 | 4 | NULL | chunk-step | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | COMPLETED | COMPLETED | NULL | NULL | 100 | 100 | 5 | 0 | 0 | 0 | 0 | 0 | �� sr *org.littlewings.javaee7.batch.MyCheckPoint ^A^B ^AI counterxp e | NULL | | 7 | 4 | NULL | batchlet-step | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | FAILED | FAILED | java.lang.RuntimeException: Oops!! Batchlet at org.littlewings.javaee7.batch.MyBatchlet.process(MyBatchlet.scala:22) at org.jberet.runtime.runner.BatchletRunner.run(BatchletRunner.java:72) at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:229) at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147) at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:128) at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:203) at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60) at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) | NULL | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | NULL | NULL | | 8 | 5 | NULL | batchlet-step | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | COMPLETED | Batchlet Success | NULL | NULL | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | NULL | NULL |rows in set (0.00 sec)
OKそうですね。
まとめ
jBatch…というか、JBeret SEを使って、Job Repositoryの保存先をデータベース(MySQL)にしてみました。
ちゃんとジョブの情報がMySQLに保存されたことと、再開時の様子も実は今回初めて見たので、理解の助けになりました。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-jdbc-job-repository