久しぶりに、Java EEを交えたネタ。以前Infinispanのトランザクション対応をスタンドアロンで試しましたが、今度はJava EE環境(JBoss AS 7.1)で試してみたいと思います。
せっかくJava EE環境でやるのなら、JPAと組み合わせてCacheと一緒にトランザクション管理してみましょう。
準備、設定ファイル
というわけで、準備。
build.sbt
name := "infinispan-jta" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.3" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation") seq(webSettings: _*) artifactName := { (version: ScalaVersion, module: ModuleID, artifact: Artifact) => //artifact.name + "." + artifact.extension "javaee6-web." + artifact.extension } libraryDependencies ++= Seq( "org.eclipse.jetty" % "jetty-webapp" % "9.1.0.v20131115" % "container", "org.eclipse.jetty" % "jetty-plus" % "9.1.0.v20131115" % "container", "javax" % "javaee-web-api" % "6.0" % "provided", "mysql" % "mysql-connector-java" % "5.1.26" % "provided", "org.infinispan" % "infinispan-core" % "6.0.1.Final" excludeAll( ExclusionRule(organization = "org.jgroups", name = "jgroups"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"), ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"), ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec") ), "org.jgroups" % "jgroups" % "3.4.1.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final", "org.jboss.marshalling" % "jboss-marshalling-river" % "1.3.18.GA", "org.jboss.marshalling" % "jboss-marshalling" % "1.3.18.GA", "org.jboss.logging" % "jboss-logging" % "3.1.2.GA", "net.jcip" % "jcip-annotations" % "1.0" )
今回はWARファイルを作ってデプロイするので、xsbt-web-pluginも使用します。
project/plugins.sbt
addSbtPlugin("com.earldouglas" % "xsbt-web-plugin" % "0.6.0")
トランザクションに対応させた、Infinispanの設定ファイル。今回は、分離レベルをREPEATABLE_READにして、楽観的ロックを使用できるようにしました。
src/main/resources/infinispan-optimistic-tx.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> <shutdown hookBehavior="REGISTER"/> </global> <default> <transaction transactionMode="TRANSACTIONAL" lockingMode="OPTIMISTIC" autoCommit="false" /> <versioning enabled="true" versioningScheme="SIMPLE" /> <locking isolationLevel="REPEATABLE_READ" writeSkewCheck="true" /> </default> </infinispan>
クラスタリングは、今回はパス。
永続性ユニットの設定。まあ、XAで試すには…ですが、MySQLを使用します。
src/main/resources/META-INF/persistence.xml
<?xml version="1.0" encoding="UTF-8"?> <persistence xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd" version="2.0"> <persistence-unit name="javaee6.web.pu" transaction-type="JTA"> <provider>org.hibernate.ejb.HibernatePersistence</provider> <jta-data-source>java:jboss/datasources/mysqlXaDs</jta-data-source> <properties> <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5InnoDBDialect" /> <property name="hibernate.show_sql" value="true" /> <property name="hibernate.format_sql" value="true" /> </properties> </persistence-unit>
あとは、CDIを使うために以下のファイルを作成します。
src/main/webapp/WEB-INF/beans.xml
Java EE 6環境ですので。
Infinispan×CDI
今回は、InfinispanのCDIモジュールを使うのではなく、普通に@Produces、@Disposesアノテーションを使って管理することにしました。
src/main/scala/org/littlewings/infinispan/jta/inject/CacheProducer.scala
package org.littlewings.infinispan.jta.inject import javax.enterprise.context.ApplicationScoped import javax.enterprise.inject.{Disposes, Produces} import javax.inject.Inject import org.infinispan.Cache import org.infinispan.manager.{DefaultCacheManager, EmbeddedCacheManager} import org.littlewings.infinispan.jta.entity.User class CacheProducer { @Produces @ApplicationScoped def getEmbeddedCacheManager: EmbeddedCacheManager = new DefaultCacheManager("infinispan-optimistic-tx.xml") def stopEmbeddedCacheManager(@Disposes manager: EmbeddedCacheManager): Unit = manager.stop() @Produces @ApplicationScoped def getCache(@Inject manager: EmbeddedCacheManager): Cache[Int, User] = manager.getCache() def stopCache(@Disposes cache: Cache[Int, User]): Unit = cache.stop() }
Userというクラスは、今回定義したJPAで使用するエンティティです。
Entity
永続化対象の、Entityクラス。
src/main/scala/org/littlewings/infinispan/jta/entity/User.scala
package org.littlewings.infinispan.jta.entity import scala.beans.BeanProperty import javax.persistence.{Column, Entity, Id, Table, Version} object User { def apply(id: Int, firstName: String, lastName: String, age: Int): User = { val user = new User user.id = id user.firstName = firstName user.lastName = lastName user.age = age user } } @SerialVersionUID(1L) @Entity @Table(name = "user") class User extends Serializable { @Id @BeanProperty var id: Int = _ @Column(name = "first_name") @BeanProperty var firstName: String = _ @Column(name = "last_name") @BeanProperty var lastName: String = _ @Column @BeanProperty var age: Int = _ @Column(name = "version_no") @Version @BeanProperty var versionNo: Int = _ override def toString: String = s"id = $id, firstName = $firstName, lastName = $lastName, age = $age, versionNo = $versionNo" }
EJB
Entityを操作する、Stateless Bean。
src/main/scala/org/littlewings/infinispan/jta/service/UserService.scala
package org.littlewings.infinispan.jta.service import scala.collection.JavaConverters._ import javax.ejb.{LocalBean, Stateless} import javax.persistence.{EntityManager, PersistenceContext} import org.littlewings.infinispan.jta.entity.User @Stateless @LocalBean class UserService { @PersistenceContext private var em: EntityManager = _ def create(user: User): Unit = em.persist(user) def createFail(user: User): Unit = { em.persist(user) throw new RuntimeException("Oops!!") } def udpate(user: User): User = em.merge(user) def remove(user: User): Unit = em.remove(em.merge(user)) def findById(id: Int): User = em.find(classOf[User], id) def findAllOrderById: Iterable[User] = em .createQuery("""|SELECT u | FROM User u | ORDER BY u.id ASC""".stripMargin) .getResultList .asScala .asInstanceOf[Iterable[User]] def removeAll(): Unit = em .createQuery("DELETE FROM User") .executeUpdate() }
ひとつ、失敗するようなメソッドが入っています。
def createFail(user: User): Unit = { em.persist(user) throw new RuntimeException("Oops!!") }
Cacheを操作するための、Stateless Bean。
src/main/scala/org/littlewings/infinispan/jta/service/CacheService.scala
package org.littlewings.infinispan.jta.service import scala.collection.JavaConverters._ import javax.ejb.{LocalBean, Stateless} import javax.inject.Inject import org.infinispan.Cache import org.littlewings.infinispan.jta.entity.User @Stateless @LocalBean class CacheService { @Inject private var cache: Cache[Int, User] = _ def get(id: Int): User = cache.get(id) def put(user: User): Unit = cache.put(user.id, user) def putFail(user: User): Unit = { cache.put(user.id, user) throw new RuntimeException("Oops!") } def clear(): Unit = cache.clear() def findAllOrderById: Iterable[User] = cache.values.asScala.toSeq.sortWith { _.id < _.id } }
こちらも、やっぱり失敗する用のメソッドが入っています。
そして、上記2つのEJBをまとめて使用するStateless Bean。
src/main/scala/org/littlewings/infinispan/jta/service/AggregateService.scala
package org.littlewings.infinispan.jta.service import javax.ejb.{EJB, LocalBean, Stateless} import org.littlewings.infinispan.jta.entity.User @Stateless @LocalBean class AggregateService { @EJB private var userService: UserService = _ @EJB private var cacheService: CacheService = _ def create(user: User): Unit = { cacheService.put(user) userService.create(user) } def createCacheAndEntityFail(user: User): Unit = { cacheService.put(user) try { userService.createFail(user) } catch { case e: Exception => } } def createEntityAndCacheFail(user: User): Unit = { userService.create(user) try { cacheService.putFail(user) } catch { case e: Exception => } } def deleteAll(): Unit = { userService.removeAll() cacheService.clear() } }
Cacheに保存してその後失敗するパターンと、その逆のパターンを用意しています。
def createCacheAndEntityFail(user: User): Unit = { cacheService.put(user) try { userService.createFail(user) } catch { case e: Exception => } } def createEntityAndCacheFail(user: User): Unit = { userService.create(user) try { cacheService.putFail(user) } catch { case e: Exception => } }
ロールバック用です。中で送出するExceptionを捕まえちゃってますが、それでも同じトランザクションに参加しているので、前のリソースに対する更新もロールバックされることを確認します。
JAX-RS
こちらは、さらっと。JAX-RSの有効化。
src/main/scala/org/littlewings/infinispan/jta/jaxrs/RestApplication.scala
package org.littlewings.infinispan.jta.jaxrs import javax.ws.rs.ApplicationPath import javax.ws.rs.core.Application @ApplicationPath("/rest") class RestApplication extends Application
リソースクラス。
src/main/scala/org/littlewings/infinispan/jta/jaxrs/TxResource.scala
package org.littlewings.infinspan.jta.jaxrs import javax.inject.Inject import javax.ws.rs.{DELETE, GET, Path, PathParam, Produces, PUT} import javax.ws.rs.core.MediaType import org.infinispan.Cache import org.littlewings.infinispan.jta.entity.User import org.littlewings.infinispan.jta.service.{AggregateService, CacheService, UserService} @Path("tx") class TxResource { @Inject private var userService: UserService = _ @Inject private var cacheService: CacheService = _ @Inject private var aggregateService: AggregateService = _ private def createUser(id: Int): User = id match { case 1 => User(1, "カツオ", "磯野", 11) case 2 => User(2, "ワカメ", "磯野", 9) case 3 => User(3, "タラオ", "フグ田", 3) } @GET @Path("{id}") @Produces(Array(MediaType.TEXT_PLAIN)) def index(@PathParam("id") id: Int): String = s"""|Database => ${userService.findById(id)} |Cache => ${cacheService.get(id)} |""".stripMargin @GET @Produces(Array(MediaType.TEXT_PLAIN)) def index: String = s"""|Database => |${userService.findAllOrderById.mkString(" ", System.lineSeparator + " ", "")} |Cache => |${cacheService.findAllOrderById.mkString(" ", System.lineSeparator + " " , "")} |""".stripMargin @PUT @Path("{id}") @Produces(Array(MediaType.TEXT_PLAIN)) def create(@PathParam("id") id: Int): String = { val user = createUser(id) aggregateService.create(user) user.toString } @PUT @Path("failEntity/{id}") @Produces(Array(MediaType.TEXT_PLAIN)) def createCacheAndFail(@PathParam("id") id: Int): String = { val user = createUser(id) aggregateService.createCacheAndEntityFail(user) user.toString } @PUT @Path("failCache/{id}") @Produces(Array(MediaType.TEXT_PLAIN)) def createEntityAndCacheFail(@PathParam("id") id: Int): String = { val user = createUser(id) aggregateService.createEntityAndCacheFail(user) user.toString } @DELETE def deleteAll(): Unit = aggregateService.deleteAll() }
更新系の処理は、AggregateServiceを介して行いますが、参照形はまあ、個々のServiceをそれぞれ使用します。
動かそう
ここまでやったら、JBoss ASを起動して
$ jboss-as-7.1.1.Final/bin/standalone.sh
パッケージングして
> packageWar [info] Updating {file:/xxxxx/}infinispan-jta... [info] Resolving org.eclipse.jetty#jetty-jndi;9.1.0.v20131115 ... [info] Done updating. [info] Compiling 7 Scala sources to /xxxxx/target/scala-2.10/classes... [info] Packaging /xxxxx/target/scala-2.10/javaee6-web.war ... [info] Done packaging. [success] Total time: 5 s, completed 2014/01/11 1:31:52
デプロイ。
cp target/scala-2.10/javaee6-web.war jboss-as-7.1.1.Final/standalone/deployments/
参照。
$ curl http://localhost:8080/javaee6-web/rest/tx/ Database => Cache =>
まだ0件です。
1件登録。
$ curl -X PUT http://localhost:8080/javaee6-web/rest/tx/1 id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0
確認してみます。
$ curl http://localhost:8080/javaee6-web/rest/tx/ Database => id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0 Cache => id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0
データベースにも、Cacheにも登録されましたね。
では、Cacheに登録した後、データベース登録に失敗するパターン。
$ curl -X PUT http://localhost:8080/javaee6-web/rest/tx/failEntity/2 id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0
例外は潰していますが、登録するはずだった値は返すようにしています。
確認。
$ curl http://localhost:8080/javaee6-web/rest/tx/Database => id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0 Cache => id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0
データは増えていません。
今度は、データベース更新した後、Cache登録の失敗。
$ curl -X PUT http://localhost:8080/javaee6-web/rest/tx/failCache/2 id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0
確認。
$ curl http://localhost:8080/javaee6-web/rest/tx/ Database => id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0 Cache => id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0
やっぱり、データは増えていません。
ちなみに、今回は行儀悪く例外を握りつぶすようにしていますが、普通に上位にそのままスローしても、やっぱりロールバックされますので。
一応、もう1度普通に登録。
$ curl -X PUT http://localhost:8080/javaee6-web/rest/tx/2id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0
確認。
$ curl http://localhost:8080/javaee6-web/rest/tx/ Database => id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0 id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0 Cache => id = 1, firstName = カツオ, lastName = 磯野, age = 11, versionNo = 0 id = 2, firstName = ワカメ, lastName = 磯野, age = 9, versionNo = 0
今度は、増えましたね!!
これで、Java EE環境でInfinispanのトランザクション対応を試すことができました。
今回書いたコードは、こちらにアップしています。
https://github.com/kazuhira-r/javaee6-scala-examples/tree/master/infinispan-jta