CLOVER🍀

That was when it all began.

InfinispanのCluster Listenerを試す

Infinispan 7.0から、Listenerにclusteredというものが設定できるようになっているのですが、これまでちょっと飛ばしていたのでここらで試してみることにしました。

Cache-level notifications

InfinispanのListenerはCacheとCacheManagerに対して使うことができますが、このうちCacheに対するListenerが今回の話の対象です。

Cluster Listernerとは?

簡単に言うと、どのNodeでイベントが発生しても通知を受け取ることができるListenerです。以下のような特徴を持ちます。

  • @CacheEntryCreated、@CacheEntryModified、@CacheEntryRemoved、@CacheEntryExpiredのみサポート
  • 上記以外のイベントは通知されない
  • イベント発生時のPre/Postのうち、PostのみがCluster Listenerに送られる

Infinispan 8.0.0.Finalで追加されたContinuous Queryは、このCluster Listenerの上に成り立っています。

とまあ、こんな感じですが、通常のCacheレベルのListenerと比べながら試してみたいと思います。

準備

sbtの設定は、以下の通り。infinispan-coreが依存関係にあれば大丈夫です。
build.sbt

name := "embedded-listeners"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.7"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "8.1.1.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided"
)

Infinispanの設定は、以下の通り。CacheはDistributed CacheとしてTTLを30秒、JGroupsはデフォルト設定を使うことにしました。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.1 http://www.infinispan.org/schemas/infinispan-config-8.1.xsd"
        xmlns="urn:infinispan:config:8.1">
    <jgroups>
        <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/>
    </jgroups>

    <cache-container default-cache="distCache" shutdown-hook="REGISTER">
        <transport stack="udp"/>
        <distributed-cache name="distCache">
            <expiration lifespan="30000"/>
        </distributed-cache>
    </cache-container>
</infinispan>

確認用プログラム

動作確認には、2種類のプログラムとListenerを用意することにしました。

まずは、Listenerから。
src/main/scala/org/littlewings/infinispan/listener/CacheListener.scala

package org.littlewings.infinispan.listener

import org.infinispan.notifications.Listener
import org.infinispan.notifications.cachelistener.annotation._
import org.infinispan.notifications.cachelistener.event.{CacheEntriesEvictedEvent, CacheEntryEvent}

trait CacheListener[K, V] {
  val name: String

  @CacheEntryCreated
  @CacheEntryModified
  @CacheEntryRemoved
  @CacheEntryExpired
  def handleEvent(event: CacheEntryEvent[K, V]): Unit = {
    println(s"[${getClass.getSimpleName}]:${name} event = ${event.getType}, isPre = ${event.isPre}, key = ${event.getKey}, value = ${event.getValue}")
  }

  @CacheEntriesEvicted
  def handleEvictedEvent(event: CacheEntriesEvictedEvent[K, V]): Unit = {
    println(s"[${getClass.getSimpleName}]:${name} event = ${event.getType}, isPre = ${event.isPre}, entries = ${event.getEntries}")
  }
}

@Listener
class LocalListener[K, V](val name: String) extends CacheListener[K, V]

@Listener(clustered = true)
class ClusteredListener[K, V](val name: String) extends CacheListener[K, V]

骨格実装はトレイトにまとめて、@CacheEntryCreated、@CacheEntryModified、@CacheEntryRemoved、@CacheEntryExpiredによる通知を受け取るメソッド、もうひとつ@CacheEntriesEvictedによる通知を受け取るメソッドを用意。

Listenerに名前を与えてわかりやすくするのと、イベント通知時に受け取った情報をある程度出力するようにしています。

    println(s"[${getClass.getSimpleName}]:${name} event = ${event.getType}, isPre = ${event.isPre}, key = ${event.getKey}, value = ${event.getValue}")

実体としては、clusteredをtrueにしたものと、そうでないものの2種類を実装しました。

@Listener
class LocalListener[K, V](val name: String) extends CacheListener[K, V]

@Listener(clustered = true)
class ClusteredListener[K, V](val name: String) extends CacheListener[K, V]

そして、Cacheを操作するプログラムとしては、簡単な対話形式のCUIプログラムと、単に浮いててもらうサーバーの2つを作成。

対話形式のプログラムはこちら。
src/main/scala/org/littlewings/infinispan/listener/Console.scala

package org.littlewings.infinispan.listener

import org.infinispan.manager.DefaultCacheManager

import scala.collection.JavaConverters._

object Console {
  def main(args: Array[String]): Unit = {
    System.setProperty("java.net.preferIPv4Stack", "true")

    val name :: mode :: Nil = args.toList

    val manager = new DefaultCacheManager("infinispan.xml")

    try {
      val cache = manager.getCache[String, String]("distCache")

      val listener =
        if (mode == "local") new LocalListener[String, String](name)
        else new ClusteredListener[String, String](name)

      cache.addListener(listener)

      Iterator
        .continually(System.console().readLine("> "))
        .filter(l => l != null && !l.isEmpty)
        .takeWhile(_ != "exit")
        .foreach { line =>
          line.split("\\s+").toList match {
            case "put" :: key :: value :: Nil =>
              cache.put(key, value)
              println(s"key:${key}, value = ${value}, putted.")
            case "get" :: key :: Nil =>
              println(s"get key = ${key} => value = ${cache.get(key)}")
            case "remove" :: key :: Nil =>
              cache.remove(key)
              println(s"removed key = ${key}")
            case "evict" :: key :: Nil =>
              cache.evict(key)
              println(s"evicted key = ${key}")
            case "all" :: Nil =>
              cache.entrySet.asScala.foreach(e => println(s"key = ${e.getKey}, value = ${e.getValue}"))
            case _ => println(s"unknown command[${line}]")
          }
        }
      cache.stop()
    } finally {
      manager.stop()
    }
  }
}

単に浮いててもらうサーバーは、こちら。
src/main/scala/org/littlewings/infinispan/listener/EmbeddedServer.scala

package org.littlewings.infinispan.listener

import java.io.{InputStreamReader, BufferedReader}
import java.nio.charset.StandardCharsets

import org.infinispan.manager.DefaultCacheManager

object EmbeddedServer {
  def main(args: Array[String]): Unit = {
    System.setProperty("java.net.preferIPv4Stack", "true")

    val name :: mode :: Nil = args.toList

    val manager = new DefaultCacheManager("infinispan.xml")

    try {
      val cache = manager.getCache[String, String]("distCache")

      val listener =
        if (mode == "local") new LocalListener[String, String](name)
        else new ClusteredListener[String, String](name)

      cache.addListener(listener)

      System.console().readLine("> Press Enter, stop!")

      cache.stop()
    } finally {
      manager.stop()
    }
  }
}

それぞれ、第1引数に便宜上の名前、第2引数にListenerの種類を取ります。第2引数に「local」を選んだ場合はデフォルト、それ以外はCluster Listenerとなります。

確認

それでは、全部で3つのNodeをまずは「local」で起動。

## CUIツール
$ sbt "runMain org.littlewings.infinispan.listener.Console console local"

## Node1
$ sbt "runMain org.littlewings.infinispan.listener.EmbeddedServer node1 local"

## Node2
$ sbt "runMain org.littlewings.infinispan.listener.EmbeddedServer node1 local"

3つのNodeが参加した、クラスタができあがります。

では、CUIツールでいくつか操作。

> put key1 value1
[LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = true, key = key1, value = null
[LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1
key:key1, value = value1, putted.
> put key1 value1-1
[LocalListener]:console event = CACHE_ENTRY_MODIFIED, isPre = true, key = key1, value = value1
[LocalListener]:console event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1
key:key1, value = value1-1, putted.
> put key2 value2
[LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = true, key = key2, value = null
[LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2
key:key2, value = value2, putted.
> put key3 value3
[LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = true, key = key3, value = null
[LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3
key:key3, value = value3, putted.
> put key3 value3-3
[LocalListener]:console event = CACHE_ENTRY_MODIFIED, isPre = true, key = key3, value = value3
[LocalListener]:console event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3
key:key3, value = value3-3, putted.
> put key4 value4
[LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = true, key = key4, value = null
[LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4
key:key4, value = value4, putted.
> remove key4  
[LocalListener]:console event = CACHE_ENTRY_REMOVED, isPre = true, key = key4, value = value4
[LocalListener]:console event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null
removed key = key4
> evict key1
[LocalListener]:console event = CACHE_ENTRY_EVICTED, isPre = false, entries = {key1=value1-1}
evicted key = key1

コンソール出力があるもの、ないものがありますね。

また、 しばらく待っていると有効期限切れになります。

[LocalListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2
[LocalListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3
[LocalListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1

他のNodeだと、今回はこうなりました。

## Node1
[LocalListener]:node1 event = CACHE_ENTRY_CREATED, isPre = true, key = key2, value = null
[LocalListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2
[LocalListener]:node1 event = CACHE_ENTRY_CREATED, isPre = true, key = key3, value = null
[LocalListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3
[LocalListener]:node1 event = CACHE_ENTRY_MODIFIED, isPre = true, key = key3, value = value3
[LocalListener]:node1 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3
[LocalListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2
[LocalListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3

## Node2
[LocalListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1
[LocalListener]:node2 event = CACHE_ENTRY_MODIFIED, isPre = true, key = key1, value = value1
[LocalListener]:node2 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1
[LocalListener]:node2 event = CACHE_ENTRY_CREATED, isPre = true, key = key4, value = null
[LocalListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4
[LocalListener]:node2 event = CACHE_ENTRY_REMOVED, isPre = true, key = key4, value = value4
[LocalListener]:node2 event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null
[LocalListener]:node2 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1

イベントを受け取るListenerは、データを持っているNode(バックアップNode含む)となっているみたいですね。また、isPreの結果がtrueとfalseになっていることから、Pre/Post両方のイベントが受け取れていることがわかります。

それでは1度プログラムを終了し、今度はCluster Listenerとして起動してみます。

## CUIツール
$ sbt "runMain org.littlewings.infinispan.listener.Console console clustered"

## Node1
$ sbt "runMain org.littlewings.infinispan.listener.EmbeddedServer node1 clustered"

## Node2
$ sbt "runMain org.littlewings.infinispan.listener.EmbeddedServer node2 clustered"

先ほどと同じ操作を実行。

> put key1 value1
[ClusteredListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1
key:key1, value = value1, putted.
> put key1 value1-1
[ClusteredListener]:console event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1
key:key1, value = value1-1, putted.
> put key2 value2
[ClusteredListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2
key:key2, value = value2, putted.
> put key3 value3
[ClusteredListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3
key:key3, value = value3, putted.
> put key3 value3-3
[ClusteredListener]:console event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3
key:key3, value = value3-3, putted.
> put key4 value4
[ClusteredListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4
key:key4, value = value4, putted.
> remove key4  
[ClusteredListener]:console event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null
removed key = key4
> evict key1
evicted key = key1

出力される内容が、明らかに減りました。isPreの結果が全部falseになり、CACHE_ENTRY_EVICTEDイベントは発生しなくなっています。

こちらも、しばらく待っているとExpireします。

[ClusteredListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2
[ClusteredListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1
[ClusteredListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3

残りのNodeの結果は、こちら。

## Node1
[ClusteredListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1
[ClusteredListener]:node1 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1
[ClusteredListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2
[ClusteredListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3
[ClusteredListener]:node1 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3
[ClusteredListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4
[ClusteredListener]:node1 event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null
[ClusteredListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2
[ClusteredListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1
[ClusteredListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3

## Node2
[ClusteredListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1
[ClusteredListener]:node2 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1
[ClusteredListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2
[ClusteredListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3
[ClusteredListener]:node2 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3
[ClusteredListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4
[ClusteredListener]:node2 event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null
[ClusteredListener]:node2 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2
[ClusteredListener]:node2 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1
[ClusteredListener]:node2 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3

Node間で、出力する内容に差がなくなりましたね。

Cluster Listenerの動作

こうなると、Cluster Listenerがどのように実装されているかが気になるところですが、Distributed Executorで実装されているようです。

イベントの発火元の一種。
https://github.com/infinispan/infinispan/blob/8.1.1.Final/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java#L302

で、Distributed Executorが起動して
https://github.com/infinispan/infinispan/blob/8.1.1.Final/core/src/main/java/org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl.java#L47
https://github.com/infinispan/infinispan/blob/8.1.1.Final/core/src/main/java/org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl.java#L90

またCacheNotifierImplに戻ってきます。
https://github.com/infinispan/infinispan/blob/8.1.1.Final/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java#L612

ちょっとぐるっと回ってる感じですが、なんとなくイメージはわかりました。

まとめ

Infinispan 7.0でCacheレベルのListenerに追加された、clusteredをtrue/falseにした場合の挙動を確認してみました。Cluster Listenerにした場合は、全Nodeのイベントを受け取るようになること、そうでない場合は該当のデータを持つ(キーが割り当てられた)NodeであればListenerが起動するということですね。

個人的には、復習も兼ねられましたと。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-listeners