Infinispan 8.0.0.Finalから、Apache Sparkへのコネクタが登場しました。
Infinispan: Infinispan Spark connector 0.1 released!
Infinispan: Infinispan Spark connector 0.2 released!
GitHub - infinispan/infinispan-spark: Infinispan Spark Connector
Infinispan Spark Connector?
で、これは何かというと、文字通りInfinispanが提供するApache Sparkと統合する機能なのですが、Infinispanの形態のいずれと統合するかというと、RemoteCache(Hot Rod)のようです。
RemoteCacheに対する、RDD、DStreamを提供します(Ver 0.2時点)。
使ってみる
というわけで、早速使ってみます。
Infinispan Spark Connectorの対応バージョンは、現在のInfinispan 8.1.0.FinalでApache Spark 1.5系となります(最新は1.6系)。
Scalaのバージョンは2.10系、2.11系とありますが、Apache Sparkの対応状況を考えると、2.10系を選ぶのがいいのかな…
準備
そんな感じなので、まずはbuild.sbtを定義。依存関係は、以下のようにしました。
libraryDependencies ++= Seq( "org.infinispan" %% "infinispan-spark" % "0.2", "org.apache.spark" %% "spark-core" % "1.5.2", "org.apache.spark" %% "spark-streaming" % "1.5.2", "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
Sparkへの依存関係は、自分で追加する必要があります。今回は、Spark Streamingも使用するので(Coreは明示的に、程度の意味ですが)、依存関係に加えてあります。
ScalaTestは、テストコードで使用します。
テストコードの雛形は、こんな感じ。
src/test/scala/org/littlewings/infinispan/spark/InfinispanSparkConnectorSpec.scala
package org.littlewings.infinispan.spark import java.util.Properties import java.util.concurrent.TimeUnit import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, Duration, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import org.infinispan.client.hotrod.configuration.ConfigurationBuilder import org.infinispan.client.hotrod.exceptions.HotRodClientException import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager, Search} import org.infinispan.protostream.annotations.ProtoSchemaBuilder import org.infinispan.query.dsl.Query import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants import org.infinispan.spark._ import org.infinispan.spark.rdd.InfinispanRDD import org.infinispan.spark.stream.InfinispanInputDStream import org.scalatest.{FunSpec, Matchers} class InfinispanSparkConnectorSpec extends FunSpec with Matchers { describe("Infinispan Spartk Connector Spec") { // ここに、テストを書く! } }
他に、いくつかヘルパーメソッドを定義します。
まずはRemoteCacheを使うためのメソッド。
protected def withRemoteCache[K, V](cacheName: String, useProtoStream: Boolean = false)(f: RemoteCache[K, V] => Unit): Unit = { val configurationBuilder = new ConfigurationBuilder configurationBuilder.addServers("localhost:11222") if (useProtoStream) { configurationBuilder.marshaller(new ProtoStreamMarshaller) } val manager = new RemoteCacheManager(configurationBuilder.build) val cache = manager.getCache[K, V](cacheName) try { f(cache) cache.clear() } finally { cache.stop() manager.stop() } }
引数を変えると、ProtoStreamMarshallerを設定に加えるようになっています。この点は、後述。いつも必要なわけではありません。
あと、SparkContextを準備するメソッド。
protected def withSpark(f: SparkContext => Unit): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("Infinispan Spark Connector Test") val sc = new SparkContext(conf) try { f(sc) } finally { sc.stop() } }
これらを使って、テストコードを組んでいきます。
Infinispan Serverはダウンロードしてきておいて、あらかじめ起動しておきます。
$ infinispan-server-8.1.0.Final/bin/standalone.sh -c clustered.xml
ここまでで、準備完了。
あとは、こちらを見ながらコードを書いていきます。
InfinispanRDDを使う
まずは、初歩的な感じでRDDから。
こちらに沿って、コードを書きます。
こんな感じ。
it("simple InfinispanRDD") { withRemoteCache[String, SimpleBook]("namedCache") { cache => SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b)) withSpark { sc => val properties = new Properties properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222") properties.setProperty("infinispan.rdd.cacheName", cache.getName) val rdd = new InfinispanRDD[String, SimpleBook](sc, properties) rdd.values.map(_.price).sum.toInt should be(24171) } } }
ここで、SimpleBookとはシリアライズ可能なこんな定義です。
src/test/scala/org/littlewings/infinispan/spark/SimpleBook.scala
package org.littlewings.infinispan.spark object SimpleBook { val sourceBooks: Seq[SimpleBook] = Array( SimpleBook("978-4798042169", "わかりやすいJavaEEウェブシステム入門", 3456, "JavaEE7準拠。ショッピングサイトや業務システムで使われるJavaEE学習書の決定版!"), SimpleBook("978-4798124605", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava", 4410, "エンタープライズJava入門書の決定版!Java EE 6は、大規模な情報システム構築に用いられるエンタープライズ環境向けのプログラミング言語です。"), SimpleBook("978-4774127804", "Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築", 3200, "Luceneは全文検索システムを構築するためのJavaのライブラリです。Luceneを使えば,一味違う高機能なWebアプリケーションを作ることができます。"), SimpleBook("978-4774161631", "[改訂新版] Apache Solr入門 オープンソース全文検索エンジン", 3780, "最新版Apaceh Solr Ver.4.5.1に対応するため大幅な書き直しと原稿の追加を行い、現在の開発環境に合わせて完全にアップデートしました。Apache Solrは多様なプログラミング言語に対応した全文検索エンジンです。"), SimpleBook("978-4048662024", "高速スケーラブル検索エンジン ElasticSearch Server", 3024, "Apache Solrを超える全文検索エンジンとして注目を集めるElasticSearch Serverの日本初の解説書です。多くのサンプルを用いた実践的入門書になっています。"), SimpleBook("978-1933988177", "Lucene in Action", 6301, "New edition of top-selling book on the new version of Lucene. the coreopen-source technology behind most full-text search and Intelligent Web applications.") ) def apply(isbn: String, title: String, price: Int, summary: String): SimpleBook = { val book = new SimpleBook book.isbn = isbn book.title = title book.price = price book.summary = summary book } } @SerialVersionUID(1L) class SimpleBook extends Serializable { var isbn: String = _ var title: String = _ var price: Int = _ var summary: String = _ }
InfinispanRDDは、以下のようにjava.util.PropertiesとSparkContextを使用して生成します。
val properties = new Properties properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222") properties.setProperty("infinispan.rdd.cacheName", cache.getName) val rdd = new InfinispanRDD[String, SimpleBook](sc, properties)
設定項目は、こちら。
見たらだいたいわかりますが、Hot Rod関係の設定がほとんどです。
実際、内部でRemoteCacheManagerおよびRemoteCacheを生成しています。
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/InfinispanRDD.scala#L31
InfinispanRDDを得た後は、RDDとしてコードを書けばいいようです。
val rdd = new InfinispanRDD[String, SimpleBook](sc, properties) rdd.values.map(_.price).sum.toInt should be(24171)
これで、最初に別のコードで設定したデータを、RDDを使って演算しています。
SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b))
InfinispanInputDStreamを使う
続いては、DStreamです。
SparkStreamingを使用するので、ヘルパーメソッドを追加。
protected def withStreaming(duration: Duration)(f: StreamingContext => Unit): Unit = { withSpark { sc => val ssc = new StreamingContext(sc, duration) try { f(ssc) } finally { ssc.stop(true) } } }
こちらを参考に、
Creating a DStream
実装したコードはこんな感じ。
it("use DStream") { withRemoteCache[String, SimpleBook]("namedCache") { cache => withStreaming(Seconds(1)) { ssc => val properties = new Properties properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222") properties.setProperty("infinispan.rdd.cacheName", cache.getName) val stream = new InfinispanInputDStream[String, SimpleBook](ssc, StorageLevel.MEMORY_ONLY, properties) stream.foreachRDD { rdd => rdd.foreach(s => println(s"isbn: ${s._1}, title: ${s._2.title}, event: ${s._3}")) } ssc.start() TimeUnit.SECONDS.sleep(2) SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b)) TimeUnit.SECONDS.sleep(2) // ssc.awaitTermination() } } }
最初に、InfinispanRDDを生成してから、InfinispanInputDStreamを生成します。
val stream = new InfinispanInputDStream[String, SimpleBook](ssc, StorageLevel.MEMORY_ONLY, properties)
あとは、DStreamとして実装していけばよいです。
InfinispanInputDStreamの実装
なんとなく予想してはいましたが、InfinispanInputDStreamの実体はListenerになります。
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/stream/InfinispanInputDStream.scala#L37
このためかと思いますが、StreamingContextを開始後すぐにRemoteCacheにデータを入れても反応がありませんでした。Listenerがちゃんと登録されて動き出す前に、データを入れてしまったんではないかと…。
ssc.start() TimeUnit.SECONDS.sleep(2) SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b)) TimeUnit.SECONDS.sleep(2)
というわけで、開始後とりあえずスリープさせることにしました。
イベントの受信はStreamingContext生成時の時間で行われているように見えますが、一時的に保存してるのかな(StorageLevel.MEMORY_ONLY?)…このあたり、SparkのAPIをちゃんと見てないのでそのうち…。
val stream = new InfinispanInputDStream[String, SimpleBook](ssc, StorageLevel.MEMORY_ONLY, properties)
FilteredInfinispanRDDを使う
クエリを使える、FilteredInfinispanRDD。RDDに対してクエリを適用し、フィルタリングされたRDDを取得してその後の処理を書くことができます。
で、ここでRemote Queryを使うことになります。なので、まずはインデキシングを有効にしたCacheを定義。
$ infinispan-server-8.1.0.Final/bin/ispn-cli.sh -c [standalone@localhost:9990 /] [standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=indexingCache:add(start=EAGER,mode=SYNC,indexing=LOCAL,auto-config=true,indexing-properties={lucene_version=LUCENE_CURRENT}) {"outcome" => "success"} [standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=indexingCache:add(configuration=indexingCache) {"outcome" => "success"}
最小設定のつもりです。この後、Infinispan Serverを再起動しておきます。
で、とりあえず普通にRDDとRemoteQueryを使おうとすると、実行に失敗します。
it("Query InfinispanRDD, No ProtoBuffers") { withRemoteCache[String, SimpleBook]("namedCache") { cache => SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b)) withSpark { sc => val properties = new Properties properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222") properties.setProperty("infinispan.rdd.cacheName", cache.getName) val rdd = new InfinispanRDD[String, SimpleBook](sc, properties) val thrown = the[HotRodClientException] thrownBy Search .getQueryFactory(cache) .from(classOf[SimpleBook]) .having("price") .gte(4000) .toBuilder .build thrown.getMessage should include("The cache manager must be configured with a ProtoStreamMarshaller") } } }
まあ、RemoteQueryを使うためには、他にも準備が必要なので…。
ここで、最初に用意しておいたRemoteCacheについてのヘルパーメソッドの引数を変えて
protected def withRemoteCache[K, V](cacheName: String, useProtoStream: Boolean = false)(f: RemoteCache[K, V] => Unit): Unit = { val configurationBuilder = new ConfigurationBuilder configurationBuilder.addServers("localhost:11222") if (useProtoStream) { configurationBuilder.marshaller(new ProtoStreamMarshaller) } val manager = new RemoteCacheManager(configurationBuilder.build) val cache = manager.getCache[K, V](cacheName) try { f(cache) cache.clear() } finally { cache.stop() manager.stop() } }
ProtoStreamMarshallerが適用されるようにします。
if (useProtoStream) { configurationBuilder.marshaller(new ProtoStreamMarshaller) }
Protocol BuffersのIDLも必要なのですが…
上記の「infinispan.rdd.query.proto.protofiles」と「infinispan.rdd.query.proto.marshallers」が該当します。
なのですが、Protofile(IDL)とMarshallerはアノテーションで自動生成することができます。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/marshall/ProtoStreamMarshallerWithAnnotationsTest.java
https://github.com/infinispan/infinispan/blob/8.1.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/query/RemoteQueryWithProtostreamAnnotationsTest.java#L37
で、今回実装したのはこちら。
src/test/scala/org/littlewings/infinispan/spark/Book.scala
package org.littlewings.infinispan.spark import org.infinispan.protostream.annotations.{ProtoDoc, ProtoField, ProtoMessage} import org.infinispan.protostream.descriptors.Type object Book { val sourceBooks: Seq[Book] = Array( Book("978-4798042169", "わかりやすいJavaEEウェブシステム入門", 3456, "JavaEE7準拠。ショッピングサイトや業務システムで使われるJavaEE学習書の決定版!"), Book("978-4798124605", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava", 4410, "エンタープライズJava入門書の決定版!Java EE 6は、大規模な情報システム構築に用いられるエンタープライズ環境向けのプログラミング言語です。"), Book("978-4774127804", "Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築", 3200, "Luceneは全文検索システムを構築するためのJavaのライブラリです。Luceneを使えば,一味違う高機能なWebアプリケーションを作ることができます。"), Book("978-4774161631", "[改訂新版] Apache Solr入門 オープンソース全文検索エンジン", 3780, "最新版Apaceh Solr Ver.4.5.1に対応するため大幅な書き直しと原稿の追加を行い、現在の開発環境に合わせて完全にアップデートしました。Apache Solrは多様なプログラミング言語に対応した全文検索エンジンです。"), Book("978-4048662024", "高速スケーラブル検索エンジン ElasticSearch Server", 3024, "Apache Solrを超える全文検索エンジンとして注目を集めるElasticSearch Serverの日本初の解説書です。多くのサンプルを用いた実践的入門書になっています。"), Book("978-1933988177", "Lucene in Action", 6301, "New edition of top-selling book on the new version of Lucene. the coreopen-source technology behind most full-text search and Intelligent Web applications.") ) def apply(isbn: String, title: String, price: Int, summary: String): Book = { val book = new Book book.isbn = isbn book.title = title book.price = price book.summary = summary book } } @ProtoDoc("@Indexed") @ProtoMessage(name = "Book") class Book { var isbn: String = _ var title: String = _ var price: Int = _ var summary: String = _ @ProtoDoc("@IndexedField") @ProtoField(number = 1, name = "isbn", `type` = Type.STRING) def getIsbn: String = isbn def setIsbn(isbn: String): Unit = this.isbn = isbn @ProtoDoc("@IndexedField") @ProtoField(number = 2, name = "title", `type` = Type.STRING) def getTitle: String = title def setTitle(title: String): Unit = this.title = title @ProtoDoc("@IndexedField") @ProtoField(number = 3, name = "price", required = true, defaultValue = "0") def getPrice: Int = price def setPrice(price: Int): Unit = this.price = price @ProtoDoc("@IndexedField") @ProtoField(number = 4, name = "summary", `type` = Type.STRING) def getSummary: String = summary def setSummary(summary: String): Unit = this.summary = summary }
Protocol Buffersと併用する場合は、Serializableである必要はありません。
現時点のIDLとMarshallerの自動生成機能では、主にプリミティブ系のフィールドの扱いにバグがあって、requiredを指定しないと値を設定してもデフォルト値で上書きされてしまいます。
@ProtoDoc("@IndexedField") @ProtoField(number = 3, name = "price", required = true, defaultValue = "0") def getPrice: Int = price
これはProtoStreamにPull Requestしておいたので、次回のリリースできっと直ると思います…。
で、長くなりましたが最終的に書いたのはこんなコードです。
it("Query InfinispanRDD") { withRemoteCache[String, Book]("indexingCache", true) { cache => val manager = cache.getRemoteCacheManager val context = ProtoStreamMarshaller.getSerializationContext(manager) val protoSchemaBuilder = new ProtoSchemaBuilder val idl = protoSchemaBuilder .fileName(classOf[Book].getName) .addClass(classOf[Book]) .build(context) val metaCache = manager.getCache[String, String](ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME) metaCache.put(classOf[Book].getName + ".proto", idl) Book.sourceBooks.foreach(b => cache.put(b.isbn, b)) withSpark { sc => val properties = new Properties properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222") properties.setProperty("infinispan.rdd.cacheName", cache.getName) val rdd = new InfinispanRDD[String, Book](sc, properties) val query: Query = Search .getQueryFactory(cache) .from( classOf[Book]) .having("price") .gte(4000) .toBuilder .build val filteredRdd = rdd.filterByQuery[Book](query, classOf[Book]) filteredRdd.values.map(_.price).sum.toInt should be(10711) } } }
ProtoStreamMarshallerは、あらかじめRemoteCacheManagerに適用しているものとします。
if (useProtoStream) { configurationBuilder.marshaller(new ProtoStreamMarshaller) }
IDLを生成しつつ、Protocol Buffers用のメタデータCacheに登録。
val context = ProtoStreamMarshaller.getSerializationContext(manager) val protoSchemaBuilder = new ProtoSchemaBuilder val idl = protoSchemaBuilder .fileName(classOf[Book].getName) .addClass(classOf[Book]) .build(context) val metaCache = manager.getCache[String, String](ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME) metaCache.put(classOf[Book].getName + ".proto", idl)
あとは、RDDを生成した後に、クエリを適用してFilteredInfinispanRDDを取得後、RDDを使ったを行います。
withSpark { sc => val properties = new Properties properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222") properties.setProperty("infinispan.rdd.cacheName", cache.getName) val rdd = new InfinispanRDD[String, Book](sc, properties) val query: Query = Search .getQueryFactory(cache) .from( classOf[Book]) .having("price") .gte(4000) .toBuilder .build val filteredRdd = rdd.filterByQuery[Book](query, classOf[Book]) filteredRdd.values.map(_.price).sum.toInt should be(10711) }
ここで、Propertiesに「infinispan.rdd.query.proto.protofiles」と「infinispan.rdd.query.proto.marshallers」を指定せず、エントリの値となるEntityにProtoStreamのアノテーションを与えていた場合は、RDDが内部で使用するRemoteCacheManagerに対して、IDLとMarshallerが自動生成されて登録されます。
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/FilteredInfinispanRDD.scala#L31
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/FilteredInfinispanRDD.scala#L78
FilteredInfinispanRDDの実体
クエリを使うので、RemotQueryは絡んでいるのだろうとは思ったのですが、内部的にはInfinispanのIteratorで実現されています。
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/FilteredInfinispanRDD.scala#L45
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/InfinispanRDD.scala#L54
RDDへの書き出し
最後は、RDDからInfinispanへの書き出しです。
Write arbitrary key/value RDDs to Infinispan
以下のimport文を入れることで、RDDへの暗黙的な変換が追加され、Infinispanへの書き出しができるようになります。
import org.infinispan.spark._
サンプルコードは、こんな感じ。
it("write to Infinispan") { withRemoteCache[String, SimpleBook]("namedCache") { cache => withSpark { sc => val properties = new Properties properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222") properties.setProperty("infinispan.rdd.cacheName", cache.getName) val simpleRdd = sc.parallelize(SimpleBook.sourceBooks.map(b => (b.isbn, b))) simpleRdd.writeToInfinispan(properties) cache should have size(6L) cache.get("978-4048662024").title should be("高速スケーラブル検索エンジン ElasticSearch Server") } } }
RDDに対して、writeToInfinispanというメソッドを使用することができるようになります。
val simpleRdd = sc.parallelize(SimpleBook.sourceBooks.map(b => (b.isbn, b)))
simpleRdd.writeToInfinispan(properties)
なお、この時のRDDへの型パラメーターの適用は、RDD[(K, V)]となっている必要があり、writeToInfinispanメソッドにはInfinispanRDDを作成した時と同じjava.util.Propertiesのインスタンスを渡す必要があります。
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/package.scala#L21
まとめ
Infinispanが提供する、Apache Spark Connectorを使ってみました。
まだバージョン0.2なので、どこまで使えるかは未知数ですが個人的にはApache Sparkにも興味はあるので、ちょっと追っていこうかなと思っています。
なお、RemoteQueryを使う時のIDLの自動生成でrequiredでないプリミティブなフィールドの値が、デフォルト値で上書きされる問題に1番ハマりました…。Spark Connector関係ないですけど…。
RemoteQuery、だいたいいつも何かしら苦労するんですよね。習熟度不足でしょうか…。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-spark-connector