2014/4/22 更新)
Grid File SystemのReadableChannelには、6.0.0.Finalで解決されるまでreadの結果が不正となり、このエントリで書いているプログラムでは無限ループになる問題がありました。
このエントリは、修正版で書き直しています。
ReadableGridFileChannel#read cycles with certain file size and chunk size
https://issues.jboss.org/browse/ISPN-3700
Infinispanの、Grid File Systemという機能を試してみました。
Grid File System
https://docs.jboss.org/author/display/ISPN/Grid+File+System
Infinispanのキャッシュ上で、ファイルシステムを作るというか、java.io.FileとかInputStream/OutputStream、そしてNIOのReadableByteChannelとWritableByteChannelが扱えるという面白い機能です。
ドキュメントによると、実験的機能だそうですけど、今でもそうなのかな?まあ、そう思うような事象にも当たったわけですが…。
Grid File Systemを使うには、InfinispanのCacheが2つ必要になります。ひとつはファイルの実データを管理するためのもの、もうひとつはファイルのメタデータを管理するためのものです。
つまり、最初にこういうコードが必要になります。
EmbeddedCacheManager manager = ...; Cache<String,byte[]> data = manager.getCache("実データ用キャッシュ"); Cache<String,GridFile.Metadata> metadata = manager.getCache("メタデータ用キャッシュ");
Cacheに適用する型パラメータも決まっています。しかし、なぜにインナークラスなのでしょう?
そして、この2つからGridFilesystemのインスタンスを作成します。
GridFilesystem fs = new GridFilesystem(data, metadata);
あとは、このGridFilesystemのインスタンスから、目的のクラスのインスタンスを取得して使います。
java.io.File
fs.getFile("パス");
java.io.InputStream
fs.getInput("パス");
java.io.OutputStream
fs.getOutput("パス");
java.nio.channel.ReadableByteChannel
fs.getReadableChannel("パス");
java.nio.channel.WritableByteChannel
fs.getWritableChannel("パス");
なお、この機能はクラスタリングにしなくても動作しますが、2つのキャッシュはそれぞれ通常以下のように設定するべきらしいです。
用途 | クラスタリングのモード |
---|---|
実データ | distribution |
メタデータ | replication |
メタデータは、ファイルの一覧と取ったりするために、わざわざRPCしたくないよねってことみたいです。実データは…スペースも食うしreplicationにしない方がいいでしょうね。
では、使ってみます。いつも通り、ここからコードがScalaに…。
まずは設定ファイル。最初はローカルキャッシュでやっていたのですが、一応最後はクラスタリングにして確認もしてみました。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport clusterName="infinispan-grid-filesystem"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> <shutdown hookBehavior="REGISTER"/> </global> <namedCache name="dataCache"> <jmxStatistics enabled="true"/> <clustering mode="distribution"> <hash numOwners="2" /> <sync /> </clustering> </namedCache> <namedCache name="metaCache"> <jmxStatistics enabled="true" /> <clustering mode="replication" /> </namedCache> </infinispan>
JGroupsの設定は端折ります。
サンプルコード。
src/main/scala/InfinispanGridFilesystem.scala
import scala.util.Try import java.io.{BufferedReader, BufferedWriter, FileInputStream, InputStreamReader, OutputStreamWriter} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import org.infinispan.io.{GridFile, GridFilesystem} import org.infinispan.manager.DefaultCacheManager object InfinispanGridFilesystem { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val dataCache = manager.getCache[String, Array[Byte]]("dataCache") val metaCache = manager.getCache[String, GridFile.Metadata]("metaCache") try { val filesystem = new GridFilesystem(dataCache, metaCache) useFile(filesystem) useOio(filesystem) useNio(filesystem) } finally { metaCache.stop() dataCache.stop() manager.stop() } } def useFile(filesystem: GridFilesystem): Unit = { // java.io.Fileを使ったサンプル } def useOio(filesystem: GridFilesystem): Unit = { // java.io.{InputStream, OutputStream}を使ったサンプル } def useNio(filesystem: GridFilesystem): Unit = { // java.nio.channel.{ReadableByteChannel, WritableByteChannel}を使ったサンプル } implicit class CloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal { // AutoCloseableに対して、foreachを使えるようにするためのラッパー } }
最後に余計なのが付いていますが…久々に書きたいコードが出たので。
サンプルとしては、2つのキャッシュとGridFilesystemのインスタンスを作成して、FileやInputStream/OutputStreamなどを順次使っていきます。
val manager = new DefaultCacheManager("infinispan.xml") val dataCache = manager.getCache[String, Array[Byte]]("dataCache") val metaCache = manager.getCache[String, GridFile.Metadata]("metaCache") try { val filesystem = new GridFilesystem(dataCache, metaCache) useFile(filesystem) useOio(filesystem) useNio(filesystem)
あと、Implicit Class兼Value Classを定義しています。
implicit class CloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal { def foreach(f: A => Unit): Unit = try { f(underlying) } finally { if (underlying != null) { underlying.close() } } }
これで、AutoCloseableインターフェースを実装しているクラスに対して、foreachを使えるようにしています。foreachを抜ける時には、自動的にクローズします。
では、Grid File SystemのAPIを順次使っていってみます。
java.io.File
今回作ったのは、以下のようなサンプルです。順次、
といったものを実行していっています。
def useFile(filesystem: GridFilesystem): Unit = { // ファイル作成 val file = filesystem.getFile("file.txt") println(file.exists) // => false file.createNewFile() println(file.exists) // => true println(file.getAbsolutePath) // => /file.txt println(Try(file.getCanonicalPath)) // => Failure(java.lang.UnsupportedOperationException: Not implemented) println(Try(file.toURI)) // => Failure(java.lang.UnsupportedOperationException: Not implemented) // ディレクトリ作成 val dirs = filesystem.getFile("/directory/subdirectoy/subsubdirectory") println(dirs.exists) // => false dirs.mkdirs() println(dirs.exists) // => true }
戻り値はjava.io.Fileのサブクラスとして返却されるので、Fileクラスが定義しているメソッドは、ある程度呼び出すことができます。Grid Filesystemに未存在のファイルを指定すると、状態としては未存在になりますが、ファイルとして作成したりディレクトリを作成することも可能です。
今回は相対パスでFileのインスタンスを取得しましたけれど、結局「/」からの指定と同じと受け取られているようです。
val file = filesystem.getFile("file.txt") println(file.getAbsolutePath) // => /file.txt
もちろん、「/」から指定しても全く問題ありません。
ただ、一部未サポートのメソッドがあるようで、しかもそれがJavadocからは判別不能です…。自分は、URIとかどうなるんだろう?という疑問で試してみたら、見事にUnsupportedOperationExceptionをもらいました。
また、InfinispanはJava 6で書かれているようなので、JDK 7にしか存在しないtoPathメソッドの呼び出しは成功はしますが、まず間違いなく正しく動いていないと思います…。というか、親の実装がそのまま動いているだけだと思いますので。
java.io.InputStream/OutputStream
では、続いてInputStream/OutputStream。ここでは、ローカルファイルをGrid File Systemに投入して、それを再度読み出すというサンプルにしています。扱っているのは、このプログラムのソースコードそのものです。
def useOio(filesystem: GridFilesystem): Unit = { for { // ローカルファイルから fis <- new FileInputStream("src/main/scala/InfinispanGridFilesystem.scala") isr <- new InputStreamReader(fis, "UTF-8") reader <- new BufferedReader(isr) // Grid Filesystemへ os <- filesystem.getOutput("/InfinispanGridFilesystem.scala") osw <- new OutputStreamWriter(os, "UTF-8") writer <- new BufferedWriter(osw) } { val chars = Iterator .continually(reader.read()) .takeWhile(_ != -1) .map(_.asInstanceOf[Char]) .toArray writer.write(chars, 0, chars.size) } for { // Grid Filesystemから読み出し is <- filesystem.getInput("/InfinispanGridFilesystem.scala") isr <- new InputStreamReader(is, "UTF-8") reader <- new BufferedReader(isr) } { val lines = Iterator .continually(reader.readLine()) .takeWhile(_ != null) .toList val max = lines.size.toString.size for ((line, index) <- lines.iterator.zipWithIndex) { val f = "%1$0" + max + "d: %2$s%n" printf(f, index, line) } } }
なんとなく、出力時には行番号を振ってみました。
あと、先にImplicit Class兼Value Classを定義しているので、FileInputStreamやBufferedWriterなどに対して、for式が使えるようになっています。はい、これは完全な蛇足です。
for { // Grid Filesystemから読み出し is <- filesystem.getInput("/InfinispanGridFilesystem.scala") isr <- new InputStreamReader(is, "UTF-8") reader <- new BufferedReader(isr) } {
foreachしか実装していないので、ここまでしかできませんが…。flatMapを実装するには、結果のクラスがまた必要になりそうなので、諦めました…。
で、話を戻して…
InputStreamやOutputStreamがGridFilesystemのインスタンスとなるだけで、あとは通常のjava.ioのクラスと同様に使えます。
InputStreamを取得する際にファイルが未存在だったり、OutputStreamで出力先を作成する時に親ディレクトリがなかったりすると、失敗するところも同じです。
java.nio.channel.ReadableByteChannel/WritableByteChannel
これはハマりました。特に、ReadableByteChannelで…。
とりあえず、こちらもjava.ioの例と同じようにローカルファイルをGrid File Systemにアップして、その中身を表示するものにしました。
def useNio(filesystem: GridFilesystem): Unit = { // ディレクトリ作成 filesystem.getFile("/sbt-configuration/").mkdir() for { // ローカルファイルから localReadChannel <- new FileInputStream("build.sbt").getChannel // Grid Filesystemへ gridWriteChannel <- filesystem.getWritableChannel("/sbt-configuration/build.sbt") } { // コピー localReadChannel.transferTo(0, localReadChannel.size, gridWriteChannel) } for (gridReadChannel <- filesystem.getReadableChannel("/sbt-configuration/build.sbt")) { val byteBuffer = ByteBuffer.allocate(512) val builder = new StringBuilder byteBuffer.clear() Iterator .continually(gridReadChannel.read(byteBuffer)) .takeWhile(len => (len != -1) || byteBuffer.position > 0) .foreach { len => byteBuffer.flip() val charBuffer = StandardCharsets.UTF_8.decode(byteBuffer) byteBuffer.compact() builder ++= charBuffer.toString } while (byteBuffer.hasRemaining) { val charBuffer = StandardCharsets.UTF_8.decode(byteBuffer) builder ++= charBuffer.toString } builder.lines.foreach(l => println(s"Nio: $l")) } }
ただ、今回はサブディレクトリの配下に書き出しています。
ローカルファイルをGrid File Systemに反映しているところには特に問題ありませんが、ハマったのは読み出し。
通常のNIOのChannelを使って読み出す時は、java.nio.channel.ReadableByteChannel#readの結果が-1になるまで繰り返すみたいなコードになると思いますが、このエントリを書いた当時のInfinispan 5.2では、これがなぜか-1になることがありませんでした…。
これは、Infinispan 6.0.0.Finalまでは、以下のようなバグがあったからのようです。
ReadableGridFileChannel#read cycles with certain file size and chunk size
https://issues.jboss.org/browse/ISPN-3700
結果、終了条件がいつまでもfalseなので、無限ループするプログラムに。
Iterator.continuallyとか小細工してるからいけないのかな?とか、そもそもReadableByteChannelとByteBufferの使い方が間違っているんじゃ?とか、いろいろ疑いましたがwhile文とかに書き直しても全く動きは変わりませんでした。ローカルモードとクラスタリングを切り替えても、変化なし。
それに、Channelの取得先をローカルファイルにすると0かどうかなんて判定しなくても、問題なく動作しましたし。
まあ後のバージョンで解決してよかったです。
最後に、作成したコード全体を載せておきます。
src/main/scala/InfinispanGridFilesystem.scala
import scala.util.Try import java.io.{BufferedReader, BufferedWriter, FileInputStream, InputStreamReader, OutputStreamWriter} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import org.infinispan.io.{GridFile, GridFilesystem} import org.infinispan.manager.DefaultCacheManager object InfinispanGridFilesystem { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val dataCache = manager.getCache[String, Array[Byte]]("dataCache") val metaCache = manager.getCache[String, GridFile.Metadata]("metaCache") try { val filesystem = new GridFilesystem(dataCache, metaCache) useFile(filesystem) useOio(filesystem) useNio(filesystem) } finally { metaCache.stop() dataCache.stop() manager.stop() } } def useFile(filesystem: GridFilesystem): Unit = { // ファイル作成 val file = filesystem.getFile("file.txt") println(file.exists) // => false file.createNewFile() println(file.exists) // => true println(file.getAbsolutePath) // => /file.txt println(Try(file.getCanonicalPath)) // => Failure(java.lang.UnsupportedOperationException: Not implemented) println(Try(file.toURI)) // => Failure(java.lang.UnsupportedOperationException: Not implemented) // ディレクトリ作成 val dirs = filesystem.getFile("/directory/subdirectoy/subsubdirectory") println(dirs.exists) // => false dirs.mkdirs() println(dirs.exists) // => true } def useOio(filesystem: GridFilesystem): Unit = { for { // ローカルファイルから fis <- new FileInputStream("src/main/scala/InfinispanGridFilesystem.scala") isr <- new InputStreamReader(fis, "UTF-8") reader <- new BufferedReader(isr) // Grid Filesystemへ os <- filesystem.getOutput("/InfinispanGridFilesystem.scala") osw <- new OutputStreamWriter(os, "UTF-8") writer <- new BufferedWriter(osw) } { val chars = Iterator .continually(reader.read()) .takeWhile(_ != -1) .map(_.asInstanceOf[Char]) .toArray writer.write(chars, 0, chars.size) } for { // Grid Filesystemから読み出し is <- filesystem.getInput("/InfinispanGridFilesystem.scala") isr <- new InputStreamReader(is, "UTF-8") reader <- new BufferedReader(isr) } { val lines = Iterator .continually(reader.readLine()) .takeWhile(_ != null) .toList val max = lines.size.toString.size for ((line, index) <- lines.iterator.zipWithIndex) { val f = "%1$0" + max + "d: %2$s%n" printf(f, index, line) } } } def useNio(filesystem: GridFilesystem): Unit = { // ディレクトリ作成 filesystem.getFile("/sbt-configuration/").mkdir() for { // ローカルファイルから localReadChannel <- new FileInputStream("build.sbt").getChannel // Grid Filesystemへ gridWriteChannel <- filesystem.getWritableChannel("/sbt-configuration/build.sbt") } { // コピー localReadChannel.transferTo(0, localReadChannel.size, gridWriteChannel) } for (gridReadChannel <- filesystem.getReadableChannel("/sbt-configuration/build.sbt")) { val byteBuffer = ByteBuffer.allocate(512) val builder = new StringBuilder byteBuffer.clear() Iterator .continually(gridReadChannel.read(byteBuffer)) .takeWhile(len => (len != -1) || byteBuffer.position > 0) .foreach { len => byteBuffer.flip() val charBuffer = StandardCharsets.UTF_8.decode(byteBuffer) byteBuffer.compact() builder ++= charBuffer.toString } while (byteBuffer.hasRemaining) { val charBuffer = StandardCharsets.UTF_8.decode(byteBuffer) builder ++= charBuffer.toString } builder.lines.foreach(l => println(s"Nio: $l")) } } implicit class CloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal { def foreach(f: A => Unit): Unit = try { f(underlying) } finally { if (underlying != null) { underlying.close() } } } }