CLOVER🍀

That was when it all began.

HazelcastのMapStoreを試してみる

Twitterで、「Hazelcastを家で使ってるけど、サーバを再起動するから、データはDBで持っていた方がいいな」みたいなツイートを見かけまして。

簡単に済むのなら、毎回ロードするでいいのでは?と思いつつも、もうひとつの解としては何らかのストアにエントリを保存するもんだよね?と思い、そういえばHazelcastのMapStoreの仕組みって無視してたなぁ〜ということで、試してみることにしました。

Hazelcastでエントリを何らかのストアに保存するには、Distributed MapのMapStoreの機能を使用するようです。
*他のデータ構造に対しては、一応QueueStoreも定義されているようですが、Listなどはなさそう?

2.1.3. Persistence
http://www.hazelcast.org/docs/latest/manual/html-single/#MapPersistence

ドキュメントは設定ファイルベースで書かれていますが、Javaのクラスで設定を組み上げることも可能です。が、自分は設定ファイルで書く方が好みなのでこちらでいきます。

MapStoreを使用するには、「com.hazelcast.core.MapStore」インターフェースを実装したクラスを作成し、Distributed Mapの定義に登録します。もしくは、実装したMapStoreのインスタンスを生成する、「com.hazelcast.core.MapStoreFactory」インターフェースを実装したファクトリクラスを、Distributed Mapに登録して、このファクトリ経由でMapStoreのインスタンスを作成するようにします。

MapStoreインターフェースは、MapLoaderインターフェースを拡張しており、以下のようなインターフェースを備えます。

  • delete
  • deleteAll
  • store
  • storeAll
  • load
  • loadAll
  • loadAllKeys

だいたい、名前で単一エントリのロード、保存、削除か、エントリの集合なのかはわかりますね。

MapStoreを使用する定義は、定義は、こんな感じです。

    <map name="default">
        ...
        <!-- enabledをtrueにする -->
        <map-store enabled="true">
            <!-- 登録する対象に応じて、class-nameかfactory-class-nameのどちらかを使用する -->

            <!-- class-nameを使用する場合は、MapStoreインターフェースの実装クラスの名前を登録する -->
            <class-name>com.example.MyMapStore</class-name> -->

            <!-- factory-class-nameを使用する場合は、MapStoreFactoryインターフェースの実装クラスの名前を登録する -->
            <!--
            <factory-class-name>com.example.MyMapStoreFactory</factory-class-name>
            -->

            <!--
                write-delay-secondsを0にするとwrite-throughとなり、0より大きくするとwrite-behindとなる
                write-throughにした場合、エントリの保存時にMapStore.store(key, value)が呼び出される
                write-behindにした場合、write-delay-seconds秒後にstoreAllが呼び出される
            -->
            <write-delay-seconds>0</write-delay-seconds>
        </map-store>
    </map>

登録する定義に応じて、map-storeタグの配下にclass-nameまたはfactory-class-nameを指定してください。またh、write-delay-secondsはMapStoreへの書き込み遅延時間を表し、0だと即時にMapStore#storeが呼び出されます。つまり、Write-Throughです。0より大きな値にすると、指定秒数後にMapStore#storeAllが一括で呼び出されるようです。つまり、Write-Behindです。

では、説明をしたところで実際に使う…MapStoreの具体的な実装に入る前に、既存のMapStoreの実装から。

Hazelcastに標準で組み込まれているMapStoreの実装は、以下の2つがあるようです。

  • com.hazelcast.spring.jpa.JPAMapStore
  • com.hazelcast.spring.mongodb.MongoMapStore

保存先はいいとして、Springかぁ…今回はパスかなぁ。それに、単純にJDBCやファイルに保存するMapStoreってないんですね。需要ないのかなぁ?

では、先へ。

準備

まずは、依存関係の定義。
build.sbt

name := "hazelcast-mapstore"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.3"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked")

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "com.hazelcast" % "hazelcast" % "3.1.5",
  "org.scalatest" %% "scalatest" % "2.0" % "test"
)

ScalaTestを使ったテストコードで確認しますが、テストの並列実行はオフにしています。

MapStoreの実装

今回は、単にメモリ上にエントリを保存する、InMemoryMapStoreとそのファクトリクラス、InMemoryMapStoreFactoryをまずは作成します。その後で、かなり大雑把にファイルに保存する、EasyFileMapStoreを作成します。

まずは、InMemoryMapStoreから。
src/main/scala/org/littlewings/hazelcast/mapstore/InMemoryMapStore.scala

package org.littlewings.hazelcast.mapstore

import scala.collection._
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap

import java.util.Collection

import com.hazelcast.core.MapStore

class InMemoryMapStore extends MapStore[String, String] {
  val store: mutable.Map[String, String] = new TrieMap[String, String]

  /* MapStoreのメソッドを実装 */
  override def delete(key: String): Unit =
    store -= key

  override def deleteAll(keys: Collection[String]): Unit =
    store --= keys.asScala

  override def store(key: String, value: String): Unit =
    store += (key -> value)

  override def storeAll(map: java.util.Map[String, String]): Unit =
    store ++= map.asScala

  /* MapLoaderのメソッドを実装 */
  override def load(key: String): String =
    store.get(key).getOrElse(null)

  override def loadAll(keys: Collection[String]): java.util.Map[String, String] =
    keys
      .asScala
      .withFilter(k => store.contains(k))
      .map(k => (k -> store.get(k).getOrElse(null)))
      .toMap
      .asJava

  override def loadAllKeys: java.util.Set[String] =
    store.keySet.asJava
}

ストアの実装は、ScalaのTrieMapとしました。Concurrent系のMapですね。実装も、単純です。

これに対応する、ファクトリクラスがこちら。
src/main/scala/org/littlewings/hazelcast/mapstore/InMemoryMapStoreFactory.scala

package org.littlewings.hazelcast.mapstore

import com.hazelcast.core.{MapLoader, MapStoreFactory}

object InMemoryMapStoreFactory {
  val INSTANCE: InMemoryMapStore = new InMemoryMapStore
}

class InMemoryMapStoreFactory extends MapStoreFactory[String, String] {
  override def newMapStore(mapName: String, properties: java.util.Properties): MapLoader[String, String] =
    InMemoryMapStoreFactory.INSTANCE
}

ファクトリクラスは、MapStoreを作成する時にMapの名前とProperties(設定ファイルに、map-storeの配下にproperties/propertyを定義すればよさそう)を受け取り、この情報を元にMapStoreのインスタンスを返却します。

今回は、MapStoreがDistributed Mapの背後で呼び出されていることを確認するため、シングルトンとしました。

続いて、ファイルに保存するMapStoreの実装。
src/main/scala/org/littlewings/hazelcast/mapstore/EasyFileMapStore.scala

package org.littlewings.hazelcast.mapstore

import scala.collection._
import scala.collection.JavaConverters._

import java.io.{ObjectInputStream, ObjectOutputStream}
import java.nio.file.{Files, Paths}
import java.util.Collection
import java.util.concurrent.ConcurrentHashMap

import com.hazelcast.core.MapStore

class EasyFileMapStore extends MapStore[String, String] {
  var store: mutable.Map[String, String] = new mutable.HashMap

  /* MapStoreのメソッドを実装 */
  override def delete(key: String): Unit = synchronized {
    store -= key
    persistStore()
  }

  override def deleteAll(keys: Collection[String]): Unit = synchronized {
    keys.asScala.foreach(k => store -= k)
    persistStore()
  }

  override def store(key: String, value: String): Unit = synchronized {
    store += (key -> value)
    persistStore()
  }

  override def storeAll(map: java.util.Map[String, String]): Unit = synchronized {
    store ++= map.asScala
    persistStore()
  }

  /* MapLoaderのメソッドを実装 */
  override def load(key: String): String = synchronized {
    loadStore()
    store.get(key).getOrElse(null)
  }

  override def loadAll(keys: Collection[String]): java.util.Map[String, String] = synchronized {
    loadStore()
    keys
      .asScala
      .withFilter(k => store.contains(k))
      .map(k => (k -> store.get(k).getOrElse(null)))
      .toMap
      .asJava
  }

  override def loadAllKeys: java.util.Set[String] = synchronized {
    loadStore()
    store.keySet.asJava
  }

  private def persistStore(): Unit = {
    val out = new ObjectOutputStream(Files.newOutputStream(Paths.get("store.ser")))
    try {
      out.writeObject(store)
    } finally {
      out.close()
    }
  }

  private def loadStore(): Unit = {
    val path = Paths.get("store.ser")
    if (Files.exists(path)) {
      val in = new ObjectInputStream(Files.newInputStream(path))
      try {
        store = in.readObject().asInstanceOf[mutable.Map[String, String]]
      } finally {
        in.close()
      }
    }
  }
}

エントリを全部Mapに持ち、変更、読み込みの度に一括でファイルにシリアライズしたりデシリアライズする、ものすごく大雑把な実装です。

性能などまるで考えていませんが、とりあえず永続化のサンプルということで…。

設定

テストコードでHazelcastを動かすものの、作成したMapStoreを登録しなければ始まりません。

今回は、このような設定ファイルを用意しました。
src/test/resources/hazelcast.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.1.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <group>
    <!-- クラスタを分ける場合は、クラスタごとに名前を設定する -->
    <name>my-cluster</name>
    <password>my-cluster-password</password>
  </group>

  <network>
    <!-- auto-incrementをfalseにした場合は、各クラスタメンバーに
         portを個別に指定する -->
    <port auto-increment="true">5701</port>
    <join>
      <multicast enabled="true">
        <multicast-group>224.2.2.3</multicast-group>
        <multicast-port>54327</multicast-port>
      </multicast>
      <tcp-ip enabled="false" />
    </join>
  </network>

  <!-- IMapのデフォルトの設定 -->
  <map name="default">
    <backup-count>1</backup-count>
    <time-to-live-seconds>0</time-to-live-seconds>
    <max-idle-seconds>0</max-idle-seconds>
    <eviction-policy>NONE</eviction-policy>
    <max-size policy="PER_NODE">0</max-size>
  </map>

  <map name="in-memory-mapstore-map">
    <backup-count>1</backup-count>
    <time-to-live-seconds>0</time-to-live-seconds>
    <max-idle-seconds>0</max-idle-seconds>
    <eviction-policy>NONE</eviction-policy>
    <max-size policy="PER_NODE">0</max-size>

    <map-store enabled="true">
      <!-- <class-name>org.littlewings.hazelcast.mapstore.InMemoryMapStore</class-name> -->
      <factory-class-name>org.littlewings.hazelcast.mapstore.InMemoryMapStoreFactory</factory-class-name>
      <!--
          write-delay-secondsを0にするとwrite-throughとなり、0より大きくするとwrite-behindとなる
          write-throughにした場合、エントリの保存時にMapStore.store(key, value)が呼び出される
          write-behindにした場合、write-delay-seconds秒後にstoreAllが呼び出される
      -->
      <write-delay-seconds>0</write-delay-seconds>
    </map-store>
  </map>

  <map name="easy-file-mapstore-map">
    <backup-count>1</backup-count>
    <time-to-live-seconds>0</time-to-live-seconds>
    <max-idle-seconds>0</max-idle-seconds>
    <eviction-policy>NONE</eviction-policy>
    <max-size policy="PER_NODE">0</max-size>

    <map-store enabled="true">
      <class-name>org.littlewings.hazelcast.mapstore.EasyFileMapStore</class-name>
      <write-delay-seconds>0</write-delay-seconds>
    </map-store>
  </map>

  <properties>
    <property name="hazelcast.memcache.enabled">false</property>
  </properties>
</hazelcast>

デフォルトのDistributed Mapの設定も適当にしましたが、「in-memory-mapstore-map」にはMapStoreFactoryを登録し、「easy-file-mapstore-map」にはMapStoreを直接登録しました。

テストを書く

それでは、今回作成したMapStoreに対するテストコードを書きます。

Hazelcastは頻繁に起動/停止することになるため、以下のようなトレイトを作成して共通化します。
src/test/scala/org/littlewings/hazelcast/mapstore/HazelcastSpecSupport.scala

package org.littlewings.hazelcast.mapstore

import com.hazelcast.config.ClasspathXmlConfig
import com.hazelcast.core.{Hazelcast, HazelcastInstance}

import org.scalatest.{BeforeAndAfterAll, Suite}

trait HazelcastSpecSupport extends BeforeAndAfterAll {
  this: Suite =>

  protected def withHazelcast[T](fun: HazelcastInstance => T): T = {
    val hazelcast =
      Hazelcast.newHazelcastInstance(new ClasspathXmlConfig("hazelcast.xml"))

    try {
      fun(hazelcast)
    } finally {
      hazelcast.getLifecycleService.shutdown()
    }
  }

  override def afterAll(): Unit =
    Hazelcast.shutdownAll()
}

これをMix-inしたテストクラスを作成し、機能を確認していきます。

まずは、デフォルトとして定義したDistributed Map。要は、MapStoreなしです。
src/test/scala/org/littlewings/hazelcast/mapstore/NoMapStoreSpec.scala

package org.littlewings.hazelcast.mapstore

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class NoMapStoreSpec extends FunSpec with HazelcastSpecSupport {
  describe("no map-store spec") {
    it("start put down, size") {
      withHazelcast { hazelcast =>
        val map = hazelcast.getMap[String, String]("default")

        (1 to 3).foreach(i => map.put(s"key$i", s"value$i"))

        map should have size 3
      }

      withHazelcast { hazelcast =>
        val map = hazelcast.getMap[String, String]("default")

        map should be ('empty)
      }
    }
  }
}

起動した時にDistriubted Mapにデータを登録しますが、停止後、再度Hazelcastインスタンスを起動しても、データはなくなっています。

続いて、InMemoryMapStoreを使ったテスト。
src/test/scala/org/littlewings/hazelcast/mapstore/InMemoryMapStoreSpec.scala

package org.littlewings.hazelcast.mapstore

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class InMemoryMapStoreSpec extends FunSpec with HazelcastSpecSupport {
  describe("in memory map-store spec") {
    it("simple") {
      withHazelcast { hazelcast =>
        val map = hazelcast.getMap("in-memory-mapstore-map")

        val instance = InMemoryMapStoreFactory.INSTANCE
        instance.store should be ('empty)
      }
    }

    it("put") {
      withHazelcast { hazelcast =>
        val map = hazelcast.getMap[String, String]("in-memory-mapstore-map")

        (1 to 3).foreach(i => map.put(s"key$i", s"value$i"))

        val instance = InMemoryMapStoreFactory.INSTANCE
        instance.store should have size 3
        instance.store should contain only (("key1" -> "value1"),
                                            ("key2" -> "value2"),
                                            ("key3" -> "value3"))
      }
    }

    it("put-remove") {
      withHazelcast { hazelcast =>
        val map = hazelcast.getMap[String, String]("in-memory-mapstore-map")

        (1 to 3).foreach(i => map.put(s"key$i", s"value$i"))

        val instance = InMemoryMapStoreFactory.INSTANCE
        instance.store should have size 3
        instance.store should contain only (("key1" -> "value1"),
                                            ("key2" -> "value2"),
                                            ("key3" -> "value3"))

        (1 to 3).foreach(i => map.remove(s"key$i"))

        instance.store should be ('empty)
      }
    }
  }
}

このテスト、本来はDistributed MapごとにMapStoreのインスタンスを生成すべきところを、シングルトンにしてしまっているのでJavaVMがシャットダウンするまでデータが消えません。そういう意味だと、少し違った意味になってしまっていますが…。

どちらかというと、Distributed Mapへの各操作が、背後にいるMapStoreにも反映されていることを確認するようなテストになってしまいました。

最後。ファイル保存するMapStoreを使用したテストです。
src/test/scala/org/littlewings/hazelcast/mapstore/EasyFileMapStoreSpec.scala

package org.littlewings.hazelcast.mapstore

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class EasyFileMapStoreSpec extends FunSpec with HazelcastSpecSupport {
  describe("easy file map-store spec") {
    it("clear") {
      withHazelcast { hazelcast =>
        val map = hazelcast.getMap[String, String]("easy-file-mapstore-map")

        map.clear()
      }
    }

    it("put-load") {
      withHazelcast { hazelcast =>
        val map = hazelcast.getMap[String, String]("easy-file-mapstore-map")

        (1 to 3).foreach(i => map.put(s"key$i", s"value$i"))
      }

      withHazelcast { hazelcast =>
        val map = hazelcast.getMap[String, String]("easy-file-mapstore-map")

        map.keySet should contain only ("key1", "key2", "key3")
        map.values should contain only ("value1", "value2", "value3")
      }
    }
  }
}

最初に起動した時はとりあえずクリアしていますが、その後データを登録してHazelcastを停止した後、再度起動したHazelcastのインスタンスから前回登録したデータが引き出せていることがわかります。

がまあ、この実装だと、ファイル名が固定なのでHazelcastインスタンスが複数起動すると困る、という問題がありまして、テストのタイミングによってはクラスタに同時に複数Hazelcastインスタンスが立ち上がってしまい、MapStoreがクラッシュするという目に遭遇しました…。

いずれにせよ不完全ですが、MapStoreの雰囲気だけでもわかればと。

今回作成したコードは、以下にアップしています。

https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-mapstore

それにしても、本家でFileMapStoreとかJdbcMapStoreとかの要望って上がらないんでしょうかね〜。