CLOVER🍀

That was when it all began.

Infinispan(Embedded Mode)でクラスタを構成してみる

Infinispan 7.0.0.Finalがリリースされてから少し間が空きましたが、その間にちょっとバージョンアップして、7.0.2.Finalになっていたようですね。

7.0.0.FinalがリリースされてからすぐGetting Started的なものを書いたのですが、今度はクラスタを構成するエントリを書きたいと思います。

Infinispanでクラスタを構成するとは?

Infinispanはローカルキャッシュだけではなく、複数のJavaVM、マシンに跨ったキャッシュを構成することができます。ローカルキャッシュを含めると、以下の4つの形態から選択することができます。

  • Local Mode … 単一のJavaVM上で使用するキャッシュです
  • Replicated Mode … クラスタに参加した全Nodeに対して、登録されたエントリを複製するキャッシュです
  • Distribution Mode … キャッシュに登録されたエントリを、指定Node数分コピーして保持するモードです
  • Invalidation Mode … クラスタを構成しますが、データの更新があった時に他のNodeが保持しているデータを削除するキャッシュです

Local Modeの時は、高機能なキャッシュライブラリとして使用できます。Replicated ModeとDistribution Modeの時は複数のNodeに跨ってデータを保持しますが、それぞれ特徴があるので用途に応じてModeを選択します。Invalidation Modeは、JPAのL2Cacheで使用されたりするようです。

なお、Infinispanでクラスタを構成する際には、基盤としてJGroupsを使うようになっています。

では、使ってみましょう。あ、今回はEmbedded Cacheとして使用しますので。

準備

sbtの設定としては、以下のようなものを用意しました。
build.sbt

name := "embedded-clustering-starter"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

incOptions := incOptions.value.withNameHashing(true)

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "7.0.2.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided"
)

今回から、sbt 0.13.7です。

設定ファイル

Infinispanの設定ファイルとしては、以下のようなファイルを用意しました。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:7.0 http://www.infinispan.org/schemas/infinispan-config-7.0.xsd"
    xmlns="urn:infinispan:config:7.0">
  <jgroups>
    <stack-file name="udp" path="default-configs/default-jgroups-udp.xml" />
  </jgroups>

  <cache-container name="clusteringCacheManager">
    <transport cluster="cluster" stack="udp" node-name="${nodeName}" />
    <distributed-cache name="distCache" />
    <replicated-cache name="replCache" />
    <jmx />
  </cache-container>
</infinispan>

少し説明を。

まず、クラスタを構成する際に必要となるJGroupsの設定ですが、以下で指定をしています。

  <jgroups>
    <stack-file name="udp" path="default-configs/default-jgroups-udp.xml" />
  </jgroups>

今回はInfinispan自身に含まれる、デフォルトの設定ファイルを使用しました。なお、プロトコルTCPUDPから指定しますが、UDPを使用しています。

続いて、cache-containerの定義。

  <cache-container name="clusteringCacheManager">
    <transport cluster="cluster" stack="udp" node-name="${nodeName}" />
    ...
  </cache-container>

transportで、クラスタを構成するための設定を指定します。ここで、stack属性には先で定義したstack-fileのname属性で決めた名前を指定します。

クラスタに参加する時のNode名はデフォルトで勝手に決まるのですが、今回はシステムプロパティから設定するようにしました。「$」を使って設定ファイル中で参照することが可能です。

最後にCacheの定義です。こちらがDistribution ModeのCache。

    <distributed-cache name="distCache" />

こちらがReplicated ModeのCacheです。

    <replicated-cache name="replCache" />

まず使ってみるのなら、これくらいで。

プログラムを書く

では、定義したCacheを使うプログラムを書いてみます。今回は、ちょっとしたコマンドラインツールっぽくしてみました。
src/main/scala/org/littlewings/infinispan/clustering/ClusteringCacheDemo.scala

package org.littlewings.infinispan.clustering

import scala.collection.JavaConverters._
import scala.io.StdIn

import org.infinispan.manager.DefaultCacheManager

object ClusteringCacheDemo {
  def main(args: Array[String]): Unit = {
    val (nodeName, cacheName) = (args(0), args(1))

    System.setProperty("nodeName", nodeName)

    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String](cacheName)

    Iterator
      .continually(StdIn.readLine(s"$nodeName> "))
      .withFilter(l => l != null && !l.isEmpty)
      .takeWhile(_ != "exit")
      .map(_.split(" +").toList)
      .foreach {
        case Nil =>
        case "put" :: key :: value :: Nil =>
          cache.put(key, value)
          println(s"Putted, $key : $value")
        case "get" :: key :: Nil =>
          println(s"Get, Key[$key] => ${cache.get(key)}")
        case "size" :: Nil =>
          println(s"Size = ${cache.size}")
        case "keys" :: Nil =>
          println(s"Keys:")
          cache.keySet.asScala.foreach(k => println(s" $k"))
        case "locate" :: Nil =>
          println("Locate:")
          val dm = cache.getAdvancedCache.getDistributionManager
          cache.keySet.asScala.foreach { k =>
            val primary = dm.getPrimaryLocation(k)
            val locate = dm.locate(k)
            println(s" Key[$k]  Primary: $primary, Locate: $locate")
          }
        case command =>
          println(s"Unknown command, [${command(0)}]")
      }

    println(s"Exit CacheServer[$nodeName]")

    cache.stop()
    manager.stop()
  }
}

引数として、Node名とCache名を取ります。

そして起動した後に入力待ちになり、put、get、size、keys、locate(エントリの配置状態を表示)というコマンドを理解できるような簡素なものです。exitと入力すると終了します。

Distribution Modeを使ってみる

それでは、作成したプログラムを使ってNodeを3つ起動してみます。まずはDistribution Modeからいってみましょう。

# Node1
$ sbt "run node1 distCache"

# Node2
$ sbt "run node2 distCache"

# Node3
$ sbt "run node3 distCache"

OSのネットワーク設定がチューニングされていない場合は、起動時にJGroupsからこんな感じで怒られます。

警告: JGRP000015: the send buffer of socket DatagramSocket was set to 640KB, but the OS only allocated 212.99KB. This might lead to performance problems. Please set your max send buffer in the OS correctly (e.g. net.core.wmem_max on Linux)

が、今回は気にしないことにします。

起動中は、クラスタに参加するNodeが増えていく度にこういう表示が行われます。

INFO: ISPN000094: Received new cluster view for channel cluster: [node1-13242|2] (3) [node1-13242, node2-30832, node3-55319]

こちらは、全部で3Node入った時の表示です。

起動後は、それぞれ入力待ちになります。

# Node 1
node1> 

# Node 2
node2> 

# Node 3
node3> 

それでは、Node 1でエントリを登録してみます。

node1> put key1 value1
Putted, key1 : value1
node1> put key2 value2 
Putted, key2 : value2
node1> put key3 value3
Putted, key3 : value3

これを、Node 2から読み出してみます。

node2> get key2
Get, Key[key2] => value2

ちゃんと読み出せましたね。

Node 3でエントリを登録してみます。

node3> put key4 value4
Putted, key4 : value4

Node 1で、Cacheのsizeとキー一覧を出力してみます。

node1> size
Size = 4
node1> keys
Keys:
 key1
 key2
 key3
 key4

*以前のInfinispanでは、Distribution Modeの時に自NodeにないエントリはkeySetやsizeに反映されなかったのですが、改善されたようです(7.0のRelease Notes参照?)

これらが、どのような配置になっているかを確認してみます。

node1> locate
Locate:
 Key[key1]  Primary: node1-13242, Locate: [node1-13242, node3-55319]
 Key[key2]  Primary: node1-13242, Locate: [node1-13242, node2-30832]
 Key[key3]  Primary: node1-13242, Locate: [node1-13242, node2-30832]
 Key[key4]  Primary: node3-55319, Locate: [node3-55319, node1-13242]

プライマリと、プライマリを含めたデータの配置先が表示されます。今回は、データの保持数はデフォルトの2なので、2つのNodeに保持(片方がバックアップ扱い)されるようになっています。

では、ここでNode 3を終了させてみましょう。

node3> exit

他のNodeは、ひとつNodeが抜けたことを検出します。

node1> 12 05, 2014 6:10:53 午後 org.infinispan.topology.ClusterTopologyManagerImpl broadcastRebalanceStart
INFO: ISPN000310: Starting cluster-wide rebalance for cache distCache, topology CacheTopology{id=6, rebalanceId=3, currentCH=DefaultConsistentHash{ns = 60, owners = (2)[node1-13242: 30+10, node2-30832: 30+10]}, pendingCH=DefaultConsistentHash{ns = 60, owners = (2)[node1-13242: 30+30, node2-30832: 30+30]}, unionCH=null}
12 05, 2014 6:10:53 午後 org.infinispan.topology.ClusterTopologyManagerImpl handleRebalanceCompleted
INFO: ISPN000328: Finished local rebalance for cache distCache on node node2-30832, topology id = 6
12 05, 2014 6:10:53 午後 org.infinispan.topology.ClusterTopologyManagerImpl handleRebalanceCompleted
INFO: ISPN000328: Finished local rebalance for cache distCache on node node1-13242, topology id = 6
12 05, 2014 6:10:53 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
INFO: ISPN000094: Received new cluster view for channel cluster: [node1-13242|3] (2) [node1-13242, node2-30832]

ここで、再度エントリの配置を確認してみます。

node1> locate
Locate:
 Key[key1]  Primary: node1-13242, Locate: [node1-13242, node2-30832]
 Key[key2]  Primary: node1-13242, Locate: [node1-13242, node2-30832]
 Key[key3]  Primary: node1-13242, Locate: [node1-13242, node2-30832]
 Key[key4]  Primary: node1-13242, Locate: [node1-13242, node2-30832]

ダウンしたNodeが持っていたデータが移動して、バックアップ分を含めて再配置されていることがわかります。

なかなか面白いです。

Replicated Mode

それでは、続いてReplicated Modeを試してみます。こちらもNodeを3つ起動します。

# Node 1
$ sbt "run node1 replCache"

# Node 2
$ sbt "run node2 replCache"

# Node 3
$ sbt "run node3 replCache"

とはいえ、途中までの実行結果はDistribution Modeと同じです。

異なるのは、エントリの配置状態を見る時になります。

node1> locate
Locate:
 Key[key1]  Primary: node3-3961, Locate: [node3-3961, node1-48304, node2-2117]
 Key[key2]  Primary: node3-3961, Locate: [node3-3961, node1-48304, node2-2117]
 Key[key3]  Primary: node3-3961, Locate: [node3-3961, node1-48304, node2-2117]
 Key[key4]  Primary: node1-48304, Locate: [node1-48304, node2-2117, node3-3961]

Distributed Modeの時と異なり、プライマリの概念はあるものの、全Nodeが同じデータを保持するようになります。

以上、簡単なInfinispanのクラスタリングご紹介でした。

今回作成したソースコードは、こちらに置いてあります。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-clustering-starter