CLOVER🍀

That was when it all began.

Java EE環境で、Infinispanのトランザクション対応を試す

久しぶりに、Java EEを交えたネタ。以前Infinispanのトランザクション対応をスタンドアロンで試しましたが、今度はJava EE環境(JBoss AS 7.1)で試してみたいと思います。

せっかくJava EE環境でやるのなら、JPAと組み合わせてCacheと一緒にトランザクション管理してみましょう。

準備、設定ファイル

というわけで、準備。
build.sbt

name := "infinispan-jta"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.3"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation")

seq(webSettings: _*)

artifactName := { (version: ScalaVersion, module: ModuleID, artifact: Artifact) =>
  //artifact.name + "." + artifact.extension
  "javaee6-web." + artifact.extension
}

libraryDependencies ++= Seq(
  "org.eclipse.jetty" % "jetty-webapp" % "9.1.0.v20131115" % "container",
  "org.eclipse.jetty" % "jetty-plus"   % "9.1.0.v20131115" % "container",
  "javax" % "javaee-web-api" % "6.0" % "provided",
  "mysql" % "mysql-connector-java" % "5.1.26" % "provided",
  "org.infinispan" % "infinispan-core" % "6.0.1.Final" excludeAll(
    ExclusionRule(organization = "org.jgroups", name = "jgroups"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"),
    ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"),
    ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec")
  ),
  "org.jgroups" % "jgroups" % "3.4.1.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final",
  "org.jboss.marshalling" % "jboss-marshalling-river" % "1.3.18.GA",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.3.18.GA",
  "org.jboss.logging" % "jboss-logging" % "3.1.2.GA",
  "net.jcip" % "jcip-annotations" % "1.0"
)

今回はWARファイルを作ってデプロイするので、xsbt-web-pluginも使用します。
project/plugins.sbt

addSbtPlugin("com.earldouglas" % "xsbt-web-plugin" % "0.6.0")

トランザクションに対応させた、Infinispanの設定ファイル。今回は、分離レベルをREPEATABLE_READにして、楽観的ロックを使用できるようにしました。
src/main/resources/infinispan-optimistic-tx.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <default>
    <transaction 
        transactionMode="TRANSACTIONAL"
        lockingMode="OPTIMISTIC"
        autoCommit="false" />
    <versioning enabled="true" versioningScheme="SIMPLE" />
    <locking
        isolationLevel="REPEATABLE_READ"
        writeSkewCheck="true" />
  </default>

</infinispan>

クラスタリングは、今回はパス。

永続性ユニットの設定。まあ、XAで試すには…ですが、MySQLを使用します。
src/main/resources/META-INF/persistence.xml

<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
             version="2.0">
  <persistence-unit name="javaee6.web.pu" transaction-type="JTA">
    <provider>org.hibernate.ejb.HibernatePersistence</provider>
    <jta-data-source>java:jboss/datasources/mysqlXaDs</jta-data-source>
    <properties>
      <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5InnoDBDialect" />
      <property name="hibernate.show_sql" value="true" />
      <property name="hibernate.format_sql" value="true" />
    </properties>
  </persistence-unit>

あとは、CDIを使うために以下のファイルを作成します。

src/main/webapp/WEB-INF/beans.xml

Java EE 6環境ですので。

Infinispan×CDI

今回は、InfinispanのCDIモジュールを使うのではなく、普通に@Produces、@Disposesアノテーションを使って管理することにしました。
src/main/scala/org/littlewings/infinispan/jta/inject/CacheProducer.scala

package org.littlewings.infinispan.jta.inject

import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.{Disposes, Produces}
import javax.inject.Inject

import org.infinispan.Cache
import org.infinispan.manager.{DefaultCacheManager, EmbeddedCacheManager}

import org.littlewings.infinispan.jta.entity.User

class CacheProducer {
  @Produces
  @ApplicationScoped
  def getEmbeddedCacheManager: EmbeddedCacheManager =
    new DefaultCacheManager("infinispan-optimistic-tx.xml")

  def stopEmbeddedCacheManager(@Disposes manager: EmbeddedCacheManager): Unit =
    manager.stop()

  @Produces
  @ApplicationScoped
  def getCache(@Inject manager: EmbeddedCacheManager): Cache[Int, User] =
    manager.getCache()

  def stopCache(@Disposes cache: Cache[Int, User]): Unit =
    cache.stop()
}

Userというクラスは、今回定義したJPAで使用するエンティティです。

Entity

永続化対象の、Entityクラス。
src/main/scala/org/littlewings/infinispan/jta/entity/User.scala

package org.littlewings.infinispan.jta.entity

import scala.beans.BeanProperty

import javax.persistence.{Column, Entity, Id, Table, Version}

object User {
  def apply(id: Int, firstName: String, lastName: String, age: Int): User = {
    val user = new User
    user.id = id
    user.firstName = firstName
    user.lastName = lastName
    user.age = age
    user
  }
}

@SerialVersionUID(1L)
@Entity
@Table(name = "user")
class User extends Serializable {
  @Id
  @BeanProperty
  var id: Int = _

  @Column(name = "first_name")
  @BeanProperty
  var firstName: String = _

  @Column(name = "last_name")
  @BeanProperty
  var lastName: String = _

  @Column
  @BeanProperty
  var age: Int = _

  @Column(name = "version_no")
  @Version
  @BeanProperty
  var versionNo: Int = _

  override def toString: String =
    s"id = $id, firstName = $firstName, lastName = $lastName, age = $age, versionNo = $versionNo"
}

EJB

Entityを操作する、Stateless Bean。
src/main/scala/org/littlewings/infinispan/jta/service/UserService.scala

package org.littlewings.infinispan.jta.service

import scala.collection.JavaConverters._

import javax.ejb.{LocalBean, Stateless}
import javax.persistence.{EntityManager, PersistenceContext}

import org.littlewings.infinispan.jta.entity.User

@Stateless
@LocalBean
class UserService {
  @PersistenceContext
  private var em: EntityManager = _

  def create(user: User): Unit =
    em.persist(user)

  def createFail(user: User): Unit = {
    em.persist(user)
    throw new RuntimeException("Oops!!")
  }

  def udpate(user: User): User =
    em.merge(user)

  def remove(user: User): Unit =
    em.remove(em.merge(user))

  def findById(id: Int): User =
    em.find(classOf[User], id)

  def findAllOrderById: Iterable[User] =
    em
      .createQuery("""|SELECT u
                      |  FROM User u
                      | ORDER BY u.id ASC""".stripMargin)
      .getResultList
      .asScala
      .asInstanceOf[Iterable[User]]

  def removeAll(): Unit =
    em
      .createQuery("DELETE FROM User")
      .executeUpdate()
}

ひとつ、失敗するようなメソッドが入っています。

  def createFail(user: User): Unit = {
    em.persist(user)
    throw new RuntimeException("Oops!!")
  }

Cacheを操作するための、Stateless Bean。
src/main/scala/org/littlewings/infinispan/jta/service/CacheService.scala

package org.littlewings.infinispan.jta.service

import scala.collection.JavaConverters._

import javax.ejb.{LocalBean, Stateless}
import javax.inject.Inject

import org.infinispan.Cache

import org.littlewings.infinispan.jta.entity.User

@Stateless
@LocalBean
class CacheService {
  @Inject
  private var cache: Cache[Int, User] = _

  def get(id: Int): User =
    cache.get(id)

  def put(user: User): Unit =
    cache.put(user.id, user)

  def putFail(user: User): Unit = {
    cache.put(user.id, user)
    throw new RuntimeException("Oops!")
  }

  def clear(): Unit =
    cache.clear()

  def findAllOrderById: Iterable[User] =
    cache.values.asScala.toSeq.sortWith { _.id < _.id }
}

こちらも、やっぱり失敗する用のメソッドが入っています。

そして、上記2つのEJBをまとめて使用するStateless Bean。
src/main/scala/org/littlewings/infinispan/jta/service/AggregateService.scala

package org.littlewings.infinispan.jta.service

import javax.ejb.{EJB, LocalBean, Stateless}

import org.littlewings.infinispan.jta.entity.User

@Stateless
@LocalBean
class AggregateService {
  @EJB
  private var userService: UserService = _

  @EJB
  private var cacheService: CacheService = _

  def create(user: User): Unit = {
    cacheService.put(user)
    userService.create(user)
  }

  def createCacheAndEntityFail(user: User): Unit = {
    cacheService.put(user)
    try {
      userService.createFail(user)
    } catch {
      case e: Exception =>
    }
  }

  def createEntityAndCacheFail(user: User): Unit = {
    userService.create(user)
    try {
      cacheService.putFail(user)
    } catch {
      case e: Exception =>
    }
  }

  def deleteAll(): Unit = {
    userService.removeAll()
    cacheService.clear()
  }
}

Cacheに保存してその後失敗するパターンと、その逆のパターンを用意しています。

  def createCacheAndEntityFail(user: User): Unit = {
    cacheService.put(user)
    try {
      userService.createFail(user)
    } catch {
      case e: Exception =>
    }
  }

  def createEntityAndCacheFail(user: User): Unit = {
    userService.create(user)
    try {
      cacheService.putFail(user)
    } catch {
      case e: Exception =>
    }
  }

ロールバック用です。中で送出するExceptionを捕まえちゃってますが、それでも同じトランザクションに参加しているので、前のリソースに対する更新もロールバックされることを確認します。

JAX-RS

こちらは、さらっと。JAX-RSの有効化。
src/main/scala/org/littlewings/infinispan/jta/jaxrs/RestApplication.scala

package org.littlewings.infinispan.jta.jaxrs

import javax.ws.rs.ApplicationPath
import javax.ws.rs.core.Application

@ApplicationPath("/rest")
class RestApplication extends Application

リソースクラス。
src/main/scala/org/littlewings/infinispan/jta/jaxrs/TxResource.scala

package org.littlewings.infinspan.jta.jaxrs

import javax.inject.Inject
import javax.ws.rs.{DELETE, GET, Path, PathParam, Produces, PUT}
import javax.ws.rs.core.MediaType

import org.infinispan.Cache

import org.littlewings.infinispan.jta.entity.User
import org.littlewings.infinispan.jta.service.{AggregateService, CacheService, UserService}

@Path("tx")
class TxResource {
  @Inject
  private var userService: UserService = _

  @Inject
  private var cacheService: CacheService = _

  @Inject
  private var aggregateService: AggregateService = _

  private def createUser(id: Int): User =
    id match {
      case 1 => User(1, "カツオ", "磯野", 11)
      case 2 => User(2, "ワカメ", "磯野", 9)
      case 3 => User(3, "タラオ", "フグ田", 3)
    }

  @GET
  @Path("{id}")
  @Produces(Array(MediaType.TEXT_PLAIN))
  def index(@PathParam("id") id: Int): String =
    s"""|Database => ${userService.findById(id)}
        |Cache    => ${cacheService.get(id)}
        |""".stripMargin

  @GET
  @Produces(Array(MediaType.TEXT_PLAIN))
  def index: String =
    s"""|Database =>
        |${userService.findAllOrderById.mkString("  ", System.lineSeparator + "  ", "")}
        |Cache =>
        |${cacheService.findAllOrderById.mkString("  ", System.lineSeparator + "  " , "")}
        |""".stripMargin

  @PUT
  @Path("{id}")
  @Produces(Array(MediaType.TEXT_PLAIN))
  def create(@PathParam("id") id: Int): String = {
    val user = createUser(id)
    aggregateService.create(user)
    user.toString
  }

  @PUT
  @Path("failEntity/{id}")
  @Produces(Array(MediaType.TEXT_PLAIN))
  def createCacheAndFail(@PathParam("id") id: Int): String = {
    val user = createUser(id)
    aggregateService.createCacheAndEntityFail(user)
    user.toString
  }

  @PUT
  @Path("failCache/{id}")
  @Produces(Array(MediaType.TEXT_PLAIN))
  def createEntityAndCacheFail(@PathParam("id") id: Int): String = {
    val user = createUser(id)
    aggregateService.createEntityAndCacheFail(user)
    user.toString
  }

  @DELETE
  def deleteAll(): Unit =
    aggregateService.deleteAll()
}

更新系の処理は、AggregateServiceを介して行いますが、参照形はまあ、個々のServiceをそれぞれ使用します。

動かそう

ここまでやったら、JBoss ASを起動して

$ jboss-as-7.1.1.Final/bin/standalone.sh

パッケージングして

> packageWar
[info] Updating {file:/xxxxx/}infinispan-jta...
[info] Resolving org.eclipse.jetty#jetty-jndi;9.1.0.v20131115 ...
[info] Done updating.
[info] Compiling 7 Scala sources to /xxxxx/target/scala-2.10/classes...
[info] Packaging /xxxxx/target/scala-2.10/javaee6-web.war ...
[info] Done packaging.
[success] Total time: 5 s, completed 2014/01/11 1:31:52

デプロイ。

cp target/scala-2.10/javaee6-web.war jboss-as-7.1.1.Final/standalone/deployments/

参照。

$ curl http://localhost:8080/javaee6-web/rest/tx/
Database =>
  
Cache =>
  

まだ0件です。

1件登録。

$ curl -X PUT http://localhost:8080/javaee6-web/rest/tx/1
id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0

確認してみます。

$ curl http://localhost:8080/javaee6-web/rest/tx/
Database =>
  id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0
Cache =>
  id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0

データベースにも、Cacheにも登録されましたね。

では、Cacheに登録した後、データベース登録に失敗するパターン。

$ curl -X PUT http://localhost:8080/javaee6-web/rest/tx/failEntity/2
id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0

例外は潰していますが、登録するはずだった値は返すようにしています。

確認。

$ curl http://localhost:8080/javaee6-web/rest/tx/Database =>
  id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0
Cache =>
  id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0

データは増えていません。

今度は、データベース更新した後、Cache登録の失敗。

$ curl -X PUT http://localhost:8080/javaee6-web/rest/tx/failCache/2
id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0

確認。

$ curl http://localhost:8080/javaee6-web/rest/tx/
Database =>
  id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0
Cache =>
  id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0

やっぱり、データは増えていません。

ちなみに、今回は行儀悪く例外を握りつぶすようにしていますが、普通に上位にそのままスローしても、やっぱりロールバックされますので。

一応、もう1度普通に登録。

$ curl -X PUT http://localhost:8080/javaee6-web/rest/tx/2id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0

確認。

$ curl http://localhost:8080/javaee6-web/rest/tx/
Database =>
  id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0
  id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0
Cache =>
  id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0
  id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0

今度は、増えましたね!!

これで、Java EE環境でInfinispanのトランザクション対応を試すことができました。

今回書いたコードは、こちらにアップしています。

https://github.com/kazuhira-r/javaee6-scala-examples/tree/master/infinispan-jta