CLOVER🍀

That was when it all began.

JTAのSynchronizationを使ってみる

JTAで、今まで割と目にしていたもので、かつ使ったことのないもののひとつにSynchronizationがあったので、ちょっと試してみました。

Synchronizationとは?

JTAのJSR、JSR-907を見てみると

JSR 907: Java Transaction API (JTA)

「3.3.2 Transaction Synchronization」として記載があります。

内容的には、こんな感じ。

Transaction Synchronizationは、TransactionManagerがトランザクション完了する前後で、アプリケーションサーバが通知を得ることを可能にします。トランザクションの開始時に、アプリケーションサーバーはTransactionManagerからのコールバックを受けるjavax.transaction.Synchronizationオブジェクトを登録するかもしれません。

トランザクションの完了時に、コールバックを受けるための仕組みのようですね。アプリケーションサーバーが使うことを想定した仕組みっぽいですが…。

javax.transaction.Synchronizationはインターフェースで、beforeCompletionとafterCompletionの2つのメソッドを持ちます。

beforeCompletionメソッドは、2フェーズコミット開始時に先がけて呼び出されます。あくまで、コミット開始時っぽいですね。

afterCompletionメソッドは、トランザクションの完了後に呼び出されます。トランザクションのステータスは、メソッドの引数として与えられます。

では、ちょっと試してみましょう。

準備

まずはビルド定義から。JTAの実装は、Narayanaとします。
build.sbt

name := "jta-synchronization"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  // Nayarana JTA
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.1.Final" % "runtime",
  "org.jboss.narayana.jta" % "narayana-jta" % "5.3.3.Final",

  // JNDI Server
  "jboss" % "jnpserver" % "4.2.2.GA",
  "org.jboss.logging" % "jboss-logging" % "3.3.0.Final",

  // JPA
  "org.hibernate" % "hibernate-entitymanager" % "5.2.1.Final",
  "dom4j" % "dom4j" % "1.6.1",

  // H2 Database
  "com.h2database" % "h2" % "1.4.192",

  // Test
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

確認には、JPAとH2Databasew使うことにしました。

JPAのEntity/設定とNarayanaの設定

JPAで使うEntityは、書籍をテーマにしたものとします。
src/test/scala/org/littlewings/javaee7/jta/Book.scala

package org.littlewings.javaee7.jta

import javax.persistence._

import scala.beans.BeanProperty

object Book {
  def apply(isbn: String, title: String, price: Int): Book = {
    val b = new Book
    b.isbn = isbn
    b.title = title
    b.price = price
    b
  }
}

@Entity
@Table(name = "book")
@SerialVersionUID(1L)
class Book extends Serializable {
  @Id
  @GeneratedValue(strategy = GenerationType.AUTO)
  @BeanProperty
  var id: Long = _

  @Column
  @BeanProperty
  var isbn: String = _

  @Column
  @BeanProperty
  var title: String = _

  @Column
  @BeanProperty
  var price: Int = _
}

単純なものです。

persistence.xmlは、以下のように設定。
src/test/resources/META-INF/persistence.xml

<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence
                                 http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd"
             version="2.1">
    <persistence-unit name="javaee7.standalone.jta.pu" transaction-type="JTA">
        <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
        <jta-data-source>jdbc/h2Ds</jta-data-source>
        <properties>
            <property name="hibernate.dialect" value="org.hibernate.dialect.H2Dialect"/>
            <property name="hibernate.show_sql" value="true"/>
            <property name="hibernate.format_sql" value="true"/>
            <property name="hibernate.hbm2ddl.auto" value="update"/>
        </properties>
    </persistence-unit>
</persistence>

Narayanaを使うにはJNDIを利用することになるため、JNDIの設定を行います。
src/test/resources/jndi.properties

java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

また、Narayana自体の設定は、以下のようにしておきました。
src/test/resources/jbossts-properties.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
    <entry key="ObjectStoreEnvironmentBean.communicationStore.objectStoreType">com.arjuna.ats.internal.arjuna.objectstore.VolatileStore</entry>
    <entry key="ObjectStoreEnvironmentBean.objectStoreDir">./target/ObjectStore</entry>
</properties>

トランザクションログを、targetディレクトリ配下にしているだけです。

テストコードの雛形

利用するテストコードの雛形は、以下のような形で用意しました。
src/test/scala/org/littlewings/javaee7/jta/JtaSynchronizationSpec.scala

package org.littlewings.javaee7.jta

import javax.naming.InitialContext
import javax.persistence.{NoResultException, Persistence}
import javax.transaction.{Status, TransactionManager, UserTransaction}

import com.arjuna.ats.jta.common.jtaPropertyManager
import com.arjuna.ats.jta.utils.JNDIManager
import org.h2.jdbcx.JdbcDataSource
import org.jnp.interfaces.NamingParser
import org.jnp.server.NamingBeanImpl
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSpec, Matchers}

class JtaSynchronizationSpec extends FunSpec with Matchers with BeforeAndAfter with BeforeAndAfterAll {
  val namingBean: NamingBeanImpl = new NamingBeanImpl

  override def beforeAll(): Unit = {
    namingBean.start()

    JNDIManager.bindJTATransactionManagerImplementation()

    namingBean.getNamingInstance.createSubcontext(new NamingParser().parse("jboss"))
    jtaPropertyManager.getJTAEnvironmentBean.setTransactionManagerJNDIContext("java:/jboss/TransactionManager")
    jtaPropertyManager
      .getJTAEnvironmentBean
      .setTransactionSynchronizationRegistryJNDIContext("java:/jboss/TransactionSynchronizationRegistry")
    jtaPropertyManager
      .getJTAEnvironmentBean
      .setUserTransactionJNDIContext("java:comp/UserTransaction")

    JNDIManager.bindJTATransactionManagerImplementation()
    JNDIManager.bindJTAUserTransactionImplementation()

    namingBean.getNamingInstance.createSubcontext(new NamingParser().parse("jdbc"))

    val dataSource = new JdbcDataSource
    dataSource.setURL("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1")
    val context = new InitialContext
    context.bind("java:/jdbc/h2Ds", dataSource)
  }

  override def afterAll(): Unit = {
    namingBean.stop()
  }

  before {
    val emf = Persistence.createEntityManagerFactory("javaee7.standalone.jta.pu")
    val em = emf.createEntityManager
    val userTransaction = new InitialContext().lookup("java:comp/UserTransaction").asInstanceOf[UserTransaction]
    userTransaction.begin()
    em.createNativeQuery("TRUNCATE TABLE book").executeUpdate()
    userTransaction.commit()
    em.close()
    emf.close()
  }

  describe("JTA Synchronization Spec") {
    // ここに、テストを書く!
  }
}

テストクラス起動時にJNDIおよびJTA、DataSourceの設定を行い、終了時にJNDI Serverを止めます。また、各テストケースごとにデータは削除するようにしています。

Synchronizationを実装する

それでは、今回利用するSynchronizationを実装してみます。

簡単に、以下のように実装しました。
src/test/scala/org/littlewings/javaee7/jta/MySynchronization.scala

package org.littlewings.javaee7.jta

import javax.transaction.Synchronization

class MySynchronization extends Synchronization {
  private var calledBeforeCompletaion: Boolean = _
  private var afterCompletionStatus: Int = -1

  def getCalledBeforeCompletion: Boolean = calledBeforeCompletaion
  def getAfterCompletionStatus: Int = afterCompletionStatus

  override def afterCompletion(status: Int): Unit = {
    afterCompletionStatus = status
    println(s"afterCompletion, status = ${status}")
  }

  override def beforeCompletion(): Unit = {
    calledBeforeCompletaion = true
    println("beforeCompletion")
  }
}

今回は、beforeCompletionおよびafterCompletionの呼び出し時にprintlnでログ出力、beforeCompletionが呼び出された場合はフラグを立てて、afterCompletionが呼び出された時にはその時のステータスを保存するようにしています。

TransactionManagerで、各種操作を行う

では、作成したSynchronizationを使ってコードを書いてみましょう。

コミットするケース

最初は、コミット時に使用してみます。

書いたコードは、こんな感じに。

    it("JTA with Synchronization, commit") {
      val emf = Persistence.createEntityManagerFactory("javaee7.standalone.jta.pu")
      val em = emf.createEntityManager

      val transactionManager = new InitialContext().lookup("java:/jboss/TransactionManager").asInstanceOf[TransactionManager]
      transactionManager.getTransaction should be (null)

      transactionManager.begin()

      transactionManager.getTransaction should not be(null)

      val synchronization = new MySynchronization

      val tx = transactionManager.getTransaction
      tx.registerSynchronization(synchronization)

      em.persist(Book("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104))
      em.persist(Book("978-4798042169", "わかりやすいJavaEEウェブシステム入門", 3456))
      em.persist(Book("978-4798124605", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava (Programmer's SELECTION) ", 4536))

      println("commit, start")
      transactionManager.commit()
      println("commit, end")

      synchronization.getCalledBeforeCompletion should be(true)
      synchronization.getAfterCompletionStatus should be(Status.STATUS_COMMITTED)

      val query =
        em
          .createQuery("SELECT b FROM Book b WHERE isbn = :isbn", classOf[Book])
          .setParameter("isbn", "978-4798140926")

      query.getSingleResult.title should be("Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築")

      em.close()
      emf.close()
    }

利用するのは、UserTransactionではなくTransactionManagerになります。

      val transactionManager = new InitialContext().lookup("java:/jboss/TransactionManager").asInstanceOf[TransactionManager]

トランザクションを開始していない時は、TransactionManagerからTransactionが取得できませんが

      transactionManager.getTransaction should be (null)

開始後は、取得できるようになります。

      transactionManager.begin()

      transactionManager.getTransaction should not be(null)

ここで、先ほど作成したSynchronizationのインスタンスを作成してTransactionManager#getTransactionで取得できるTransactionに設定します。

      val synchronization = new MySynchronization

      val tx = transactionManager.getTransaction
      tx.registerSynchronization(synchronization)

あとはコミットすると、Synchronizationが呼び出されたことが確認できます。

      println("commit, start")
      transactionManager.commit()
      println("commit, end")

      synchronization.getCalledBeforeCompletion should be(true)
      synchronization.getAfterCompletionStatus should be(Status.STATUS_COMMITTED)

今回は、STATUS_COMMITTEDになっています。

printlnでのログ出力ですが、以下のようになりました。
Hibernateのログは除いています

commit, start
beforeCompletion
afterCompletion, status = 3
commit, end

動作していることが確認できましたね。

ロールバックするケース

今度は、ロールバックしてみます。

    it("JTA with Synchronization, rollback") {
      val emf = Persistence.createEntityManagerFactory("javaee7.standalone.jta.pu")
      val em = emf.createEntityManager

      val transactionManager = new InitialContext().lookup("java:/jboss/TransactionManager").asInstanceOf[TransactionManager]
      transactionManager.getTransaction should be (null)

      transactionManager.begin()

      transactionManager.getTransaction should not be(null)

      val synchronization = new MySynchronization

      val tx = transactionManager.getTransaction
      tx.registerSynchronization(synchronization)

      em.persist(Book("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104))
      em.persist(Book("978-4798042169", "わかりやすいJavaEEウェブシステム入門", 3456))
      em.persist(Book("978-4798124605", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava (Programmer's SELECTION) ", 4536))

      println("rollback, start")
      transactionManager.rollback()
      println("rollback, end")

      synchronization.getCalledBeforeCompletion should be(false)
      synchronization.getAfterCompletionStatus should be(Status.STATUS_ROLLEDBACK)

      val query =
        em
          .createQuery("SELECT b FROM Book b WHERE isbn = :isbn", classOf[Book])
          .setParameter("isbn", "978-4798140926")

      val thrown = the[NoResultException] thrownBy query.getSingleResult
      thrown.getMessage should be("No entity found for query")

      em.close()
      emf.close()
    }

ロールバックしているのでデータが保存されていないわけですが、Synchronizationでの結果も先ほどとちょっと変わります。

なんと、beforeCompletionが呼び出されていないことになっています。

      synchronization.getCalledBeforeCompletion should be(false)
      synchronization.getAfterCompletionStatus should be(Status.STATUS_ROLLEDBACK)

ステータスは、STATUS_ROLLEDBACKになっていますが。

ログを見ても、確かに呼ばれていません。

rollback, start
afterCompletion, status = 4
rollback, end

ここでJSRを振り返ると、確かにbeforeCompletionはコミット開始時に、って感じのことが書かれていましたからねぇ。

なるほど。

suspend/resumeも試してみる

オマケとして、suspendやresumeも試してみます。

    it("JTA with Synchronization, suspend/resume/commit") {
      val emf = Persistence.createEntityManagerFactory("javaee7.standalone.jta.pu")
      val em = emf.createEntityManager

      val transactionManager = new InitialContext().lookup("java:/jboss/TransactionManager").asInstanceOf[TransactionManager]
      transactionManager.getTransaction should be (null)

      transactionManager.begin()

      transactionManager.getTransaction should not be(null)

      val synchronization = new MySynchronization

      val tx = transactionManager.getTransaction
      tx.registerSynchronization(synchronization)

      em.persist(Book("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104))
      em.persist(Book("978-4798042169", "わかりやすいJavaEEウェブシステム入門", 3456))
      em.persist(Book("978-4798124605", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava (Programmer's SELECTION) ", 4536))

      println("suspend, start")
      val suspendedTx = transactionManager.suspend()
      println("suspend, end")

      synchronization.getCalledBeforeCompletion should be(false)
      synchronization.getAfterCompletionStatus should be(-1)

      suspendedTx should be theSameInstanceAs(tx)

      println("resume, start")
      transactionManager.resume(suspendedTx)
      println("resume, end")

      synchronization.getCalledBeforeCompletion should be(false)
      synchronization.getAfterCompletionStatus should be(-1)

      println("commit, start")
      transactionManager.commit()
      println("commit, end")

      synchronization.getCalledBeforeCompletion should be(true)
      synchronization.getAfterCompletionStatus should be(Status.STATUS_COMMITTED)

      val query =
        em
          .createQuery("SELECT b FROM Book b WHERE isbn = :isbn", classOf[Book])
          .setParameter("isbn", "978-4798140926")

      query.getSingleResult.title should be("Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築")

      em.close()
      emf.close()
    }

まあ、ここまでくると予想がつきますが、suspendやresumeの呼び出しではSynchronizationは呼び出されません。あくまで、トランザクション完了時だということですね。

なお、当たり前かもしれませんが、TransactionManager#getTransactionで取得したTransactionとTransactionManager#suspendで取得したTransactionは同一のものですね。

      val suspendedTx = transactionManager.suspend()

      suspendedTx should be theSameInstanceAs(tx)

まとめ

JTAのSynchronizationを使ってみました。

これを試してみたのは、他のライブラリなどでJTAトランザクションに参加するためのひとつの手段として使われていたからですが、使ってみて挙動を確認することができました。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jta-synchronization

参考)
JavaEE使い方メモ(JTA)