CLOVER🍀

That was when it all began.

Infinispan ServerをEmbeddedで動かす

InfinispanのServerディストリビューションでは、以下の3種類のプロトコルを扱うことができます。

正確に言うとWebSocketもあるのですが、実験的サポートが取れないのでいったん除外。

で、これらは通常Infinispan Server(WildFlyベース)を使って触れることになるプロトコルなわけですが、Embedded(組み込み)としても使用できます。

まあ、利用目的はテストとかになるとは思いますが、ちょっとした動作確認やサンプルなどにはよいかも?

Infinispan Server(Hot Rod/Memcached/REST)の構成

ネットワーク関係にNettyを使って実装されたcoreモジュールがあり、そのうえに各種プロトコル用のモジュールが構成されています。また、プログラムの多くはScalaで書かれています。

アーティファクトのIDに、Scalaのメジャーバージョン入ってませんけど。

いずれも、内部的にはEmbeddedなCacheを利用して、そのうえで各種プロトコルをかぶせたCacheの使い方をします。

その他、補足しておくと…

Hot RodはInfinispan独自のバイナリプロトコルです。

Memcachedは、テキストプロトコルのみのサポートとなります。

RESTは、JAX-RS(RESTEasy)の上に構築されています。

ばくっと言うと、そんなところ?

Embeddedな各Serverモジュールを構成するには、こちらのテストコードを参考にするとよいと思います。

https://github.com/infinispan/infinispan/blob/8.2.1.Final/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
https://github.com/infinispan/infinispan/blob/8.2.1.Final/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
https://github.com/infinispan/infinispan/blob/8.2.1.Final/server/rest/src/test/scala/org/infinispan/rest/RestTestingUtil.java

とまあ、前置きはこれくらいにして使っていってみましょう。

今回は、マルチプロジェクト構成で共通モジュールで定義したEntityを、各Serverモジュールで利用してみるような構成にしてみます。

準備

まずは、ビルド定義。

基本的な部分は、こんな感じにしました。
build.sbt

name := "remote-servers-embedded"

val projectScalaVersion = "2.11.8"

scalaVersion := projectScalaVersion

parallelExecution in Test := false

lazy val commonSettings = Seq(
  version := "0.0.1-SNAPSHOT",
  organization := "org.littlewings",
  scalaVersion := projectScalaVersion,
  scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xexperimental"),
  updateOptions := updateOptions.value.withCachedResolution(true),
  parallelExecution in Test := false,
  fork in Test := true,
  libraryDependencies ++= Seq(
    "org.scalatest" %% "scalatest" % "2.2.6" % "test"
  )
)

lazy val root = (project in file("."))
  .aggregate(entity, hotrod, memcached, rest)

lazy val entity = (project in file("entity"))
  .settings(commonSettings: _*)

// hotrod, memcached, restはあとで

ScalaTestは、テストコード用です。

各Serverモジュールの部分は、個別に書いていきます。

共通で使うEntity

とりあえず同じようなサンプルにしようと思いまして、ひとつ共通的なEntityを定義しておきます。

お題は、書籍ということで。
entity/src/main/scala/org/littlewings/infinispan/embeddedserver/Book.scala

package org.littlewings.infinispan.embeddedserver

import scala.beans.BeanProperty

object Book {
  def apply(isbn: String, title: String, price: Int): Book = {
    val book = new Book
    book.isbn = isbn
    book.title = title
    book.price = price
    book
  }
}

@SerialVersionUID(1L)
class Book extends Serializable {
  @BeanProperty
  var isbn: String = _

  @BeanProperty
  var title: String = _

  @BeanProperty
  var price: Int = _
}

このクラスを使って、各Serverモジュールで遊んでみましょう。

Hot Rod

最初はHot Rod。

プロジェクトの定義は、こちら。

lazy val hotrod = (project in file("hotrod"))
  .dependsOn(entity)
  .settings(commonSettings: _*)
  .settings(
    libraryDependencies ++= Seq(
      "org.infinispan" % "infinispan-server-hotrod" % "8.2.1.Final",
      "net.jcip" % "jcip-annotations" % "1.0" % "provided",
      "org.infinispan" % "infinispan-client-hotrod" % "8.2.1.Final" % "test",
      "org.infinispan" % "infinispan-query-dsl" % "8.2.1.Final" % "provided"
    )
  )

Hot RodのServerモジュールは、「infinispan-server-hotrod」を利用します。Hot RodのClientも入れていますが、こちらはテスト用です。Query DSLおよびjcip-annotationsは、コンパイル警告抑制のため…。

Hot Rod ServerをEmbeddedに使ったテストコードは、こちら。
hotrod/src/test/scala/org/littlewings/infinispan/embeddedserver/HotRodServerSpec.scala

package org.littlewings.infinispan.embeddedserver

import org.infinispan.client.hotrod.RemoteCacheManager
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.infinispan.commons.equivalence.ByteArrayEquivalence
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.server.hotrod.HotRodServer
import org.infinispan.server.hotrod.configuration.HotRodServerConfigurationBuilder
import org.scalatest.{FunSpec, Matchers}

class HotRodServerSpec extends FunSpec with Matchers {
  describe("Hot Rod Embedded Server") {
    it("getting started") {
      // Embedded Cache setup
      val embeddedCacheManager = new DefaultCacheManager
      embeddedCacheManager
        .defineConfiguration(
          "hotRodCache",
          new org.infinispan.configuration.cache.ConfigurationBuilder()
            .dataContainer()
            .keyEquivalence(ByteArrayEquivalence.INSTANCE)
            .valueEquivalence(ByteArrayEquivalence.INSTANCE)
            .build
        )

      // Hot Rod Server setup
      val hotRodServerHost = "localhost"
      val hotRodServerPort = 11222
      val hotRodServer = new HotRodServer
      hotRodServer.start(
        new HotRodServerConfigurationBuilder()
          .host(hotRodServerHost)
          .port(hotRodServerPort)
          .workerThreads(Runtime.getRuntime.availableProcessors) // デフォルト値
          .build,
        embeddedCacheManager
      )

      // Hot Rod Client setup
      val remoteCacheManager = new RemoteCacheManager(
        new ConfigurationBuilder().addServers(s"${hotRodServerHost}:${hotRodServerPort}").build
      )
      val remoteCache = remoteCacheManager.getCache[String, Book]("hotRodCache")

      // use Cache
      val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5337)

      remoteCache.put(book.isbn, book)

      remoteCache should have size (1)
      remoteCache.get(book.isbn).title should be("Infinispan Data Grid Platform Definitive Guide")
      remoteCache.get(book.isbn).price should be(5337)

      // resource clean up
      remoteCache.stop()
      remoteCacheManager.stop()
      hotRodServer.stop
      embeddedCacheManager.stop()
    }
  }
}

どのServerモジュールもそうですが、内部的にはEmbedded Cacheを使っているので、EmbeddedCacheManagerが必要です。事前にCacheの定義を行う必要があるかどうかはServerモジュール次第ですが、Hot Rodの場合必要となります。

      // Embedded Cache setup
      val embeddedCacheManager = new DefaultCacheManager
      embeddedCacheManager
        .defineConfiguration(
          "hotRodCache",
          new org.infinispan.configuration.cache.ConfigurationBuilder()
            .dataContainer()
            .keyEquivalence(ByteArrayEquivalence.INSTANCE)
            .valueEquivalence(ByteArrayEquivalence.INSTANCE)
            .build
        )

注意点としては、Cacheを定義する際にKeyEquivalenceとValueEquiValenceにByteArrayEquivalence.INSTANCEを設定しておくことです。

これをやっておかないと、putしたエントリをgetで取得する時などに、うまく比較できなくなって「データが入っているし、Cacheにエントリ数分sizeがあるのに、取得することができない」みたいな事態になります…。

こちらを見て、設定が必要なことに気付きました。
https://github.com/infinispan/infinispan/blob/8.2.1.Final/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala#L345-L350

もしくは、Infinispan Serverモジュールの設定を見ると、AnyServerEquivalenceを設定しても良さそうです。
https://github.com/infinispan/infinispan/blob/8.2.1.Final/server/integration/infinispan/src/main/resources/infinispan-defaults.xml#L33-L34

どちらを使うのがいいのかな…。

続いて、Hot Rod Serverのセットアップ。

      // Hot Rod Server setup
      val hotRodServerHost = "localhost"
      val hotRodServerPort = 11222
      val hotRodServer = new HotRodServer
      hotRodServer.start(
        new HotRodServerConfigurationBuilder()
          .host(hotRodServerHost)
          .port(hotRodServerPort)
          .workerThreads(Runtime.getRuntime.availableProcessors) // デフォルト値
          .build,
        embeddedCacheManager
      )

HotRodServerConfigurationBuilderを使って、リッスンポートなどを指定していきます。WorkerThreadはNettyの設定に直結しますが、デフォルトはCPU数(Runtime.getRuntime.availableProcessors)で設定されています。

HotRodServer#startでServerが起動するので、あとはClient側のコードでアクセスします。

      // Hot Rod Client setup
      val remoteCacheManager = new RemoteCacheManager(
        new ConfigurationBuilder().addServers(s"${hotRodServerHost}:${hotRodServerPort}").build
      )
      val remoteCache = remoteCacheManager.getCache[String, Book]("hotRodCache")

      // use Cache
      val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5337)

      remoteCache.put(book.isbn, book)

      remoteCache should have size (1)
      remoteCache.get(book.isbn).title should be("Infinispan Data Grid Platform Definitive Guide")
      remoteCache.get(book.isbn).price should be(5337)

最後に、起動したServerなどをstopで停止して終了です。

      // resource clean up
      remoteCache.stop()
      remoteCacheManager.stop()
      hotRodServer.stop
      embeddedCacheManager.stop()

Memcached

続いて、Memcached

プロジェクト定義は、こちら。

lazy val memcached = (project in file("memcached"))
  .dependsOn(entity)
  .settings(commonSettings: _*)
  .settings(
    libraryDependencies ++= Seq(
      "org.infinispan" % "infinispan-server-memcached" % "8.2.1.Final",
      "net.jcip" % "jcip-annotations" % "1.0" % "provided",
      "net.spy" % "spymemcached" % "2.12.1" % "test"
    )
  )

Memcached Serverを使う場合は、「infinispan-server-memcached」モジュールを利用します。

Memcachedプロトコル(テキストプロトコルのみですが)を利用できるので、Client側は「spymemcached」を使うことにします。

Memcached Serverを、Embeddedに使ったテストコードは、こちら。
memcached/src/test/scala/org/littlewings/infinispan/embeddedserver/MemcachedServerSpec.scala

package org.littlewings.infinispan.embeddedserver

import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit

import net.spy.memcached.MemcachedClient
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.server.memcached.MemcachedServer
import org.infinispan.server.memcached.configuration.MemcachedServerConfigurationBuilder
import org.scalatest.{FunSpec, Matchers}

class MemcachedServerSpec extends FunSpec with Matchers {
  describe("Memcached Embedded Server") {
    it("getting started") {
      // Embeddd Cache setup
      val embeddecCacheManager = new DefaultCacheManager

      // Memcached Server setup
      val memcachedHost = "localhost"
      val memcachedPort = 11211
      val memcachedServer = new MemcachedServer
      memcachedServer.start(
        new MemcachedServerConfigurationBuilder()
          .defaultCacheName("memcachedCache")
          .host(memcachedHost)
          .port(memcachedPort)
          .workerThreads(Runtime.getRuntime.availableProcessors) // デフォルト値
          .build,
        embeddecCacheManager
      )

      // Memcached Client setup
      val client = new MemcachedClient(new InetSocketAddress(memcachedHost, memcachedPort))

      // use Cache
      val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5337)

      client.set(book.isbn, 3, book)
      client.get(book.isbn).asInstanceOf[Book].title should be("Infinispan Data Grid Platform Definitive Guide")
      client.get(book.isbn).asInstanceOf[Book].price should be(5337)

      TimeUnit.SECONDS.sleep(5L)

      client.get(book.isbn) should be(null)

      // resource clean up
      client.shutdown()
      memcachedServer.stop
      embeddecCacheManager.stop()
    }
  }
}

基本的な構成は、Hot Rodの時と同じですが、利用するEmbedded Cacheは、事前定義しておくのは必須ではありません。もちろん、定義しておいてもかまいませんが。

      // Embeddd Cache setup
      val embeddecCacheManager = new DefaultCacheManager

Server側のセットアップコードもHot Rodの時と似たものになりますが、こちらはどのCacheを利用するか、起動時に設定しておく必要があります。

      // Memcached Server setup
      val memcachedHost = "localhost"
      val memcachedPort = 11211
      val memcachedServer = new MemcachedServer
      memcachedServer.start(
        new MemcachedServerConfigurationBuilder()
          .defaultCacheName("memcachedCache")
          .host(memcachedHost)
          .port(memcachedPort)
          .workerThreads(Runtime.getRuntime.availableProcessors) // デフォルト値
          .build,
        embeddecCacheManager
      )

なお、利用するCacheの名前を何も指定しなかった場合は、ホントのデフォルトのCache(org.infinispan.commons.api.BasicCacheContainer#DEFAULT_CACHE_NAME)が利用されるようです(実体は「___defaultcache」)。

MemcachedServer#startでServerが起動するので、あとはMemcachedのClientでアクセスします。

      // Memcached Client setup
      val client = new MemcachedClient(new InetSocketAddress(memcachedHost, memcachedPort))

      // use Cache
      val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5337)

      client.set(book.isbn, 3, book)
      client.get(book.isbn).asInstanceOf[Book].title should be("Infinispan Data Grid Platform Definitive Guide")
      client.get(book.isbn).asInstanceOf[Book].price should be(5337)

      TimeUnit.SECONDS.sleep(5L)

      client.get(book.isbn) should be(null)

使い終わったら、stopしていきます。spymemcachedについては、shutdownですが。

      // resource clean up
      client.shutdown()
      memcachedServer.stop
      embeddecCacheManager.stop()

REST

最後は、REST。

プロジェクト定義は、こちら。

lazy val rest = (project in file("rest"))
  .dependsOn(entity)
  .settings(commonSettings: _*)
  .settings(
    libraryDependencies ++= Seq(
      "org.infinispan" % "infinispan-server-rest" % "8.2.1.Final" classifier "classes" excludeAll (ExclusionRule(organization = "org.apache.logging.log4j")),
      "net.jcip" % "jcip-annotations" % "1.0" % "provided",
      "org.jboss.resteasy" % "resteasy-client" % "3.0.11.Final" % "test",
      "org.jboss.resteasy" % "resteasy-jackson2-provider" % "3.0.11.Final" % "test"
    )
  )

REST Serverを使う場合は、「infinispan-server-rest」を使用します。

が、REST ServerはWARとして提供される形態でMaven Centralにアップロードされています。JARとして取得する場合は、classifierで「classes」を指定します。

どこかで、WAR形態での利用も試してみたいところですね…。

あと、RESTEasyがSLF4Jを利用するのですが、Infinispanが(オプションで)依存するLog4j2と競合するみたいなので、除外。

REST Clientとしては、JAX-RS Client(RESTEasy)を使用します。JSON変化の部分はJacksonを使用する方向で。RESTEasyのバージョンが最新ではありませんが、これはInfinispanのREST Serverが使用しているRESTEasyのバージョンに合わせています。

REST Serverを、Embeddedに使ったテストコードは、こちら。
rest/src/test/scala/org/littlewings/infinispan/embeddedserver/RestServerSpec.scala

package org.littlewings.infinispan.embeddedserver

import javax.ws.rs.client.{ClientBuilder, Entity}
import javax.ws.rs.core.Response

import org.infinispan.configuration.cache.ConfigurationBuilder
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.rest.NettyRestServer
import org.infinispan.rest.configuration.RestServerConfigurationBuilder
import org.scalatest.{FunSpec, Matchers}

class RestServerSpec extends FunSpec with Matchers {
  describe("Rest Embedded Server") {
    it("getting started") {
      // Embedded Cache setup
      val embeddedCacheManager = new DefaultCacheManager
      embeddedCacheManager.defineConfiguration("restCache", new ConfigurationBuilder().build)

      // Rest Server setup
      val restServerHost = "localhost"
      val restServerPort = 8080
      val restServer =
        NettyRestServer(
          new RestServerConfigurationBuilder()
            .host(restServerHost)
            .port(restServerPort)
            .build(),
          embeddedCacheManager
        )
      restServer.start()

      // use Cache
      val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5337)

      val client = ClientBuilder.newBuilder().build

      val putResponse =
        client
          .target(s"http://${restServerHost}:${restServerPort}/rest/restCache/${book.isbn}")
          .request
          .put(Entity.json(book))
      putResponse.getStatus should be(Response.Status.OK.getStatusCode)
      putResponse.close()

      val responseBook =
        client
        .target(s"http://${restServerHost}:${restServerPort}/rest/restCache/${book.isbn}")
        .request
          .get(classOf[Book])

      responseBook.title should be("Infinispan Data Grid Platform Definitive Guide")
      responseBook.price should be(5337)

      // resource clean up
      client.close()
      restServer.stop()
      embeddedCacheManager.stop()
    }
  }
}

REST Serverの場合は、事前にEmbeddedなCacheを定義しておく必要があります。

      // Embedded Cache setup
      val embeddedCacheManager = new DefaultCacheManager
      embeddedCacheManager.defineConfiguration("restCache", new ConfigurationBuilder().build)

また、Serverの起動は他の形態とNettyRestServer#applyでConfigurationとEmbeddedCacheManagerを渡した後、引数なしのNettyRestServer#startを呼び出します。

      // Rest Server setup
      val restServerHost = "localhost"
      val restServerPort = 8080
      val restServer =
        NettyRestServer(
          new RestServerConfigurationBuilder()
            .host(restServerHost)
            .port(restServerPort)
            .build(),
          embeddedCacheManager
        )
      restServer.start()

REST Serverの内部では、「resteasy-netty4」が使われていますが、あまりNettyまわりの設定を触ることはできなさそうな雰囲気です。
https://github.com/infinispan/infinispan/blob/8.2.1.Final/server/rest/src/main/scala/org/infinispan/rest/NettyRestServer.scala#L54-L61

NettyJaxrsServerはNettyRestServer内部に構成も起動も閉じているので、スレッド数などのNettyまわりの設定はほぼ触れない感じです。

ConfirationBuilderも、継承関係(インターフェースの実装ですが)が他と違うんですよね。
https://github.com/infinispan/infinispan/blob/8.2.1.Final/server/rest/src/main/scala/org/infinispan/rest/configuration/RestServerConfigurationBuilder.java#L14

Hot RodやMemcachedの場合は、ProtocolServerConfigurationBuilderというクラスのサブクラスになっています。
https://github.com/infinispan/infinispan/blob/8.2.1.Final/server/hotrod/src/main/scala/org/infinispan/server/hotrod/configuration/HotRodServerConfigurationBuilder.java#L15
https://github.com/infinispan/infinispan/blob/8.2.1.Final/server/memcached/src/main/scala/org/infinispan/server/memcached/configuration/MemcachedServerConfigurationBuilder.java#L12

REST Server起動後は、ふつうにREST APIでアクセスすればOKです。

      // use Cache
      val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5337)

      val client = ClientBuilder.newBuilder().build

      val putResponse =
        client
          .target(s"http://${restServerHost}:${restServerPort}/rest/restCache/${book.isbn}")
          .request
          .put(Entity.json(book))
      putResponse.getStatus should be(Response.Status.OK.getStatusCode)
      putResponse.close()

      val responseBook =
        client
        .target(s"http://${restServerHost}:${restServerPort}/rest/restCache/${book.isbn}")
        .request
          .get(classOf[Book])

      responseBook.title should be("Infinispan Data Grid Platform Definitive Guide")
      responseBook.price should be(5337)

最後に、NettyRestServerなどをstopしてお終い。

      // resource clean up
      client.close()
      restServer.stop()
      embeddedCacheManager.stop()

まとめ

InfinispanのServer各種モジュールを、Embeddedに使ってみました。

通常はInfinispan Serverディストリビューションとしてダウンロードして使用するものだと思いますが、テスト目的などの利用によいのでは?と。あと、こういう使い方ができると各Serverモジュールでハマった時などに追いやすいのではないかな?とも思います。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-servers-embedded