CLOVER🍀

That was when it all began.

InfinispanでGrid File System

2014/4/22 更新)
Grid File SystemのReadableChannelには、6.0.0.Finalで解決されるまでreadの結果が不正となり、このエントリで書いているプログラムでは無限ループになる問題がありました。
このエントリは、修正版で書き直しています。

ReadableGridFileChannel#read cycles with certain file size and chunk size
https://issues.jboss.org/browse/ISPN-3700

Infinispanの、Grid File Systemという機能を試してみました。

Grid File System
https://docs.jboss.org/author/display/ISPN/Grid+File+System

Infinispanのキャッシュ上で、ファイルシステムを作るというか、java.io.FileとかInputStream/OutputStream、そしてNIOのReadableByteChannelとWritableByteChannelが扱えるという面白い機能です。

ドキュメントによると、実験的機能だそうですけど、今でもそうなのかな?まあ、そう思うような事象にも当たったわけですが…。

Grid File Systemを使うには、InfinispanのCacheが2つ必要になります。ひとつはファイルの実データを管理するためのもの、もうひとつはファイルのメタデータを管理するためのものです。

つまり、最初にこういうコードが必要になります。

EmbeddedCacheManager manager = ...;
Cache<String,byte[]> data = manager.getCache("実データ用キャッシュ");
Cache<String,GridFile.Metadata> metadata = manager.getCache("メタデータ用キャッシュ");

Cacheに適用する型パラメータも決まっています。しかし、なぜにインナークラスなのでしょう?

そして、この2つからGridFilesystemのインスタンスを作成します。

GridFilesystem fs = new GridFilesystem(data, metadata);

あとは、このGridFilesystemのインスタンスから、目的のクラスのインスタンスを取得して使います。

java.io.File

fs.getFile("パス");

java.io.InputStream

fs.getInput("パス");

java.io.OutputStream

fs.getOutput("パス");

java.nio.channel.ReadableByteChannel

fs.getReadableChannel("パス");

java.nio.channel.WritableByteChannel

fs.getWritableChannel("パス");

なお、この機能はクラスタリングにしなくても動作しますが、2つのキャッシュはそれぞれ通常以下のように設定するべきらしいです。

用途 クラスタリングのモード
実データ distribution
メタデータ replication

メタデータは、ファイルの一覧と取ったりするために、わざわざRPCしたくないよねってことみたいです。実データは…スペースも食うしreplicationにしない方がいいでしょうね。

では、使ってみます。いつも通り、ここからコードがScalaに…。

まずは設定ファイル。最初はローカルキャッシュでやっていたのですが、一応最後はクラスタリングにして確認もしてみました。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <transport clusterName="infinispan-grid-filesystem">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <namedCache name="dataCache">
    <jmxStatistics enabled="true"/>
    <clustering mode="distribution">
      <hash numOwners="2" />
      <sync />
    </clustering>
  </namedCache>

  <namedCache name="metaCache">
    <jmxStatistics enabled="true" />
    <clustering mode="replication" />
  </namedCache>
</infinispan>

JGroupsの設定は端折ります。

サンプルコード。
src/main/scala/InfinispanGridFilesystem.scala

import scala.util.Try

import java.io.{BufferedReader, BufferedWriter, FileInputStream, InputStreamReader, OutputStreamWriter}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

import org.infinispan.io.{GridFile, GridFilesystem}
import org.infinispan.manager.DefaultCacheManager

object InfinispanGridFilesystem {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")

    val dataCache = manager.getCache[String, Array[Byte]]("dataCache")
    val metaCache = manager.getCache[String, GridFile.Metadata]("metaCache")

    try {
      val filesystem = new GridFilesystem(dataCache, metaCache)

      useFile(filesystem)
      useOio(filesystem)
      useNio(filesystem)
    } finally {
      metaCache.stop()
      dataCache.stop()
      manager.stop()
    }
  }

  def useFile(filesystem: GridFilesystem): Unit = {
    // java.io.Fileを使ったサンプル
  }

  def useOio(filesystem: GridFilesystem): Unit = {
    // java.io.{InputStream, OutputStream}を使ったサンプル
  }

  def useNio(filesystem: GridFilesystem): Unit = {
    // java.nio.channel.{ReadableByteChannel, WritableByteChannel}を使ったサンプル
  }

  implicit class CloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    // AutoCloseableに対して、foreachを使えるようにするためのラッパー
  }
}

最後に余計なのが付いていますが…久々に書きたいコードが出たので。

サンプルとしては、2つのキャッシュとGridFilesystemのインスタンスを作成して、FileやInputStream/OutputStreamなどを順次使っていきます。

    val manager = new DefaultCacheManager("infinispan.xml")

    val dataCache = manager.getCache[String, Array[Byte]]("dataCache")
    val metaCache = manager.getCache[String, GridFile.Metadata]("metaCache")

    try {
      val filesystem = new GridFilesystem(dataCache, metaCache)

      useFile(filesystem)
      useOio(filesystem)
      useNio(filesystem)

あと、Implicit Class兼Value Classを定義しています。

  implicit class CloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(f: A => Unit): Unit =
      try {
        f(underlying)
      } finally {
        if (underlying != null) {
          underlying.close()
        }
      }
  }

これで、AutoCloseableインターフェースを実装しているクラスに対して、foreachを使えるようにしています。foreachを抜ける時には、自動的にクローズします。

では、Grid File SystemのAPIを順次使っていってみます。

java.io.File

今回作ったのは、以下のようなサンプルです。順次、

  • Fileの取得
  • Fileの存在確認
  • 新規ファイルの作成
  • Fileの存在確認
  • 絶対パスの確認
  • Canonicalパスの確認と、URIへの変換(失敗する)
  • ディレクトリの作成

といったものを実行していっています。

  def useFile(filesystem: GridFilesystem): Unit = {
    // ファイル作成
    val file = filesystem.getFile("file.txt")

    println(file.exists) // => false

    file.createNewFile()

    println(file.exists) // => true

    println(file.getAbsolutePath) // => /file.txt
    println(Try(file.getCanonicalPath)) // => Failure(java.lang.UnsupportedOperationException: Not implemented)
    println(Try(file.toURI)) // => Failure(java.lang.UnsupportedOperationException: Not implemented)

    // ディレクトリ作成
    val dirs = filesystem.getFile("/directory/subdirectoy/subsubdirectory")
    println(dirs.exists)  // => false

    dirs.mkdirs()

    println(dirs.exists)  // => true
  }

戻り値はjava.io.Fileのサブクラスとして返却されるので、Fileクラスが定義しているメソッドは、ある程度呼び出すことができます。Grid Filesystemに未存在のファイルを指定すると、状態としては未存在になりますが、ファイルとして作成したりディレクトリを作成することも可能です。

今回は相対パスでFileのインスタンスを取得しましたけれど、結局「/」からの指定と同じと受け取られているようです。

    val file = filesystem.getFile("file.txt")
    println(file.getAbsolutePath) // => /file.txt

もちろん、「/」から指定しても全く問題ありません。

ただ、一部未サポートのメソッドがあるようで、しかもそれがJavadocからは判別不能です…。自分は、URIとかどうなるんだろう?という疑問で試してみたら、見事にUnsupportedOperationExceptionをもらいました。

また、InfinispanはJava 6で書かれているようなので、JDK 7にしか存在しないtoPathメソッドの呼び出しは成功はしますが、まず間違いなく正しく動いていないと思います…。というか、親の実装がそのまま動いているだけだと思いますので。

java.io.InputStream/OutputStream

では、続いてInputStream/OutputStream。ここでは、ローカルファイルをGrid File Systemに投入して、それを再度読み出すというサンプルにしています。扱っているのは、このプログラムのソースコードそのものです。

  def useOio(filesystem: GridFilesystem): Unit = {
    for {
      // ローカルファイルから
      fis <- new FileInputStream("src/main/scala/InfinispanGridFilesystem.scala")
      isr <- new InputStreamReader(fis, "UTF-8")
      reader <- new BufferedReader(isr)

      // Grid Filesystemへ
      os <- filesystem.getOutput("/InfinispanGridFilesystem.scala")
      osw <- new OutputStreamWriter(os, "UTF-8")
      writer <- new BufferedWriter(osw)
    } {
      val chars =
        Iterator
          .continually(reader.read())
          .takeWhile(_ != -1)
          .map(_.asInstanceOf[Char])
          .toArray
      writer.write(chars, 0, chars.size)
    }

    for {
      // Grid Filesystemから読み出し
      is <- filesystem.getInput("/InfinispanGridFilesystem.scala")
      isr <- new InputStreamReader(is, "UTF-8")
      reader <- new BufferedReader(isr)
    } {
      val lines =
        Iterator
          .continually(reader.readLine())
          .takeWhile(_ != null)
          .toList
      val max = lines.size.toString.size
      
      for ((line, index) <- lines.iterator.zipWithIndex) {
        val f = "%1$0" + max + "d: %2$s%n"
        printf(f, index, line)
      }
    }
  }

なんとなく、出力時には行番号を振ってみました。

あと、先にImplicit Class兼Value Classを定義しているので、FileInputStreamやBufferedWriterなどに対して、for式が使えるようになっています。はい、これは完全な蛇足です。

    for {
      // Grid Filesystemから読み出し
      is <- filesystem.getInput("/InfinispanGridFilesystem.scala")
      isr <- new InputStreamReader(is, "UTF-8")
      reader <- new BufferedReader(isr)
    } {

foreachしか実装していないので、ここまでしかできませんが…。flatMapを実装するには、結果のクラスがまた必要になりそうなので、諦めました…。

で、話を戻して…

InputStreamやOutputStreamがGridFilesystemのインスタンスとなるだけで、あとは通常のjava.ioのクラスと同様に使えます。
InputStreamを取得する際にファイルが未存在だったり、OutputStreamで出力先を作成する時に親ディレクトリがなかったりすると、失敗するところも同じです。

java.nio.channel.ReadableByteChannel/WritableByteChannel

これはハマりました。特に、ReadableByteChannelで…。

とりあえず、こちらもjava.ioの例と同じようにローカルファイルをGrid File Systemにアップして、その中身を表示するものにしました。

  def useNio(filesystem: GridFilesystem): Unit = {
    // ディレクトリ作成
    filesystem.getFile("/sbt-configuration/").mkdir()

    for {
      // ローカルファイルから
      localReadChannel <- new FileInputStream("build.sbt").getChannel

      // Grid Filesystemへ
      gridWriteChannel <- filesystem.getWritableChannel("/sbt-configuration/build.sbt")
    } {
      // コピー
      localReadChannel.transferTo(0, localReadChannel.size, gridWriteChannel)
    }

    for (gridReadChannel <- filesystem.getReadableChannel("/sbt-configuration/build.sbt")) {
      val byteBuffer = ByteBuffer.allocate(512)
      val builder = new StringBuilder

      byteBuffer.clear()

      Iterator
        .continually(gridReadChannel.read(byteBuffer))
        .takeWhile(len => (len != -1) || byteBuffer.position > 0)
        .foreach { len =>
          byteBuffer.flip()
          val charBuffer = StandardCharsets.UTF_8.decode(byteBuffer)
          byteBuffer.compact()

          builder ++= charBuffer.toString
        }

      while (byteBuffer.hasRemaining) {
        val charBuffer = StandardCharsets.UTF_8.decode(byteBuffer)
        builder ++= charBuffer.toString
      }

      builder.lines.foreach(l => println(s"Nio: $l"))
    }
  }

ただ、今回はサブディレクトリの配下に書き出しています。

ローカルファイルをGrid File Systemに反映しているところには特に問題ありませんが、ハマったのは読み出し。

通常のNIOのChannelを使って読み出す時は、java.nio.channel.ReadableByteChannel#readの結果が-1になるまで繰り返すみたいなコードになると思いますが、このエントリを書いた当時のInfinispan 5.2では、これがなぜか-1になることがありませんでした…。

これは、Infinispan 6.0.0.Finalまでは、以下のようなバグがあったからのようです。

ReadableGridFileChannel#read cycles with certain file size and chunk size
https://issues.jboss.org/browse/ISPN-3700

結果、終了条件がいつまでもfalseなので、無限ループするプログラムに。

Iterator.continuallyとか小細工してるからいけないのかな?とか、そもそもReadableByteChannelとByteBufferの使い方が間違っているんじゃ?とか、いろいろ疑いましたがwhile文とかに書き直しても全く動きは変わりませんでした。ローカルモードとクラスタリングを切り替えても、変化なし。
それに、Channelの取得先をローカルファイルにすると0かどうかなんて判定しなくても、問題なく動作しましたし。

まあ後のバージョンで解決してよかったです。

最後に、作成したコード全体を載せておきます。
src/main/scala/InfinispanGridFilesystem.scala

import scala.util.Try

import java.io.{BufferedReader, BufferedWriter, FileInputStream, InputStreamReader, OutputStreamWriter}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

import org.infinispan.io.{GridFile, GridFilesystem}
import org.infinispan.manager.DefaultCacheManager

object InfinispanGridFilesystem {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")

    val dataCache = manager.getCache[String, Array[Byte]]("dataCache")
    val metaCache = manager.getCache[String, GridFile.Metadata]("metaCache")

    try {
      val filesystem = new GridFilesystem(dataCache, metaCache)

      useFile(filesystem)
      useOio(filesystem)
      useNio(filesystem)
    } finally {
      metaCache.stop()
      dataCache.stop()
      manager.stop()
    }
  }

  def useFile(filesystem: GridFilesystem): Unit = {
    // ファイル作成
    val file = filesystem.getFile("file.txt")

    println(file.exists) // => false

    file.createNewFile()

    println(file.exists) // => true

    println(file.getAbsolutePath) // => /file.txt
    println(Try(file.getCanonicalPath)) // => Failure(java.lang.UnsupportedOperationException: Not implemented)
    println(Try(file.toURI)) // => Failure(java.lang.UnsupportedOperationException: Not implemented)

    // ディレクトリ作成
    val dirs = filesystem.getFile("/directory/subdirectoy/subsubdirectory")
    println(dirs.exists)  // => false

    dirs.mkdirs()

    println(dirs.exists)  // => true
  }

  def useOio(filesystem: GridFilesystem): Unit = {
    for {
      // ローカルファイルから
      fis <- new FileInputStream("src/main/scala/InfinispanGridFilesystem.scala")
      isr <- new InputStreamReader(fis, "UTF-8")
      reader <- new BufferedReader(isr)

      // Grid Filesystemへ
      os <- filesystem.getOutput("/InfinispanGridFilesystem.scala")
      osw <- new OutputStreamWriter(os, "UTF-8")
      writer <- new BufferedWriter(osw)
    } {
      val chars =
        Iterator
          .continually(reader.read())
          .takeWhile(_ != -1)
          .map(_.asInstanceOf[Char])
          .toArray
      writer.write(chars, 0, chars.size)
    }

    for {
      // Grid Filesystemから読み出し
      is <- filesystem.getInput("/InfinispanGridFilesystem.scala")
      isr <- new InputStreamReader(is, "UTF-8")
      reader <- new BufferedReader(isr)
    } {
      val lines =
        Iterator
          .continually(reader.readLine())
          .takeWhile(_ != null)
          .toList
      val max = lines.size.toString.size
      
      for ((line, index) <- lines.iterator.zipWithIndex) {
        val f = "%1$0" + max + "d: %2$s%n"
        printf(f, index, line)
      }
    }
  }

  def useNio(filesystem: GridFilesystem): Unit = {
    // ディレクトリ作成
    filesystem.getFile("/sbt-configuration/").mkdir()

    for {
      // ローカルファイルから
      localReadChannel <- new FileInputStream("build.sbt").getChannel

      // Grid Filesystemへ
      gridWriteChannel <- filesystem.getWritableChannel("/sbt-configuration/build.sbt")
    } {
      // コピー
      localReadChannel.transferTo(0, localReadChannel.size, gridWriteChannel)
    }

    for (gridReadChannel <- filesystem.getReadableChannel("/sbt-configuration/build.sbt")) {
      val byteBuffer = ByteBuffer.allocate(512)
      val builder = new StringBuilder

      byteBuffer.clear()

      Iterator
        .continually(gridReadChannel.read(byteBuffer))
        .takeWhile(len => (len != -1) || byteBuffer.position > 0)
        .foreach { len =>
          byteBuffer.flip()
          val charBuffer = StandardCharsets.UTF_8.decode(byteBuffer)
          byteBuffer.compact()

          builder ++= charBuffer.toString
        }

      while (byteBuffer.hasRemaining) {
        val charBuffer = StandardCharsets.UTF_8.decode(byteBuffer)
        builder ++= charBuffer.toString
      }

      builder.lines.foreach(l => println(s"Nio: $l"))
    }
  }

  implicit class CloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(f: A => Unit): Unit =
      try {
        f(underlying)
      } finally {
        if (underlying != null) {
          underlying.close()
        }
      }
  }
}