CLOVER🍀

That was when it all began.

Infinispan(Hot Rod) × Apache Spark

Infinispan 8.0.0.Finalから、Apache Sparkへのコネクタが登場しました。

Infinispan: Infinispan Spark connector 0.1 released!

Infinispan: Infinispan Spark connector 0.2 released!

Spark Tutorial - Infinispan

GitHubリポジトリは、こちら。

GitHub - infinispan/infinispan-spark: Infinispan Spark Connector

Infinispan Spark Connector?

で、これは何かというと、文字通りInfinispanが提供するApache Sparkと統合する機能なのですが、Infinispanの形態のいずれと統合するかというと、RemoteCache(Hot Rod)のようです。

RemoteCacheに対する、RDD、DStreamを提供します(Ver 0.2時点)。

使ってみる

というわけで、早速使ってみます。

Infinispan Spark Connectorの対応バージョンは、現在のInfinispan 8.1.0.FinalでApache Spark 1.5系となります(最新は1.6系)。

Compatibility

Scalaバージョンは2.10系、2.11系とありますが、Apache Sparkの対応状況を考えると、2.10系を選ぶのがいいのかな…

準備

そんな感じなので、まずはbuild.sbtを定義。依存関係は、以下のようにしました。

libraryDependencies ++= Seq(
  "org.infinispan" %% "infinispan-spark" % "0.2",
  "org.apache.spark" %% "spark-core" % "1.5.2",
  "org.apache.spark" %% "spark-streaming" % "1.5.2",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

Sparkへの依存関係は、自分で追加する必要があります。今回は、Spark Streamingも使用するので(Coreは明示的に、程度の意味ですが)、依存関係に加えてあります。

ScalaTestは、テストコードで使用します。

テストコードの雛形は、こんな感じ。
src/test/scala/org/littlewings/infinispan/spark/InfinispanSparkConnectorSpec.scala

package org.littlewings.infinispan.spark

import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, Duration, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.infinispan.client.hotrod.exceptions.HotRodClientException
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller
import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager, Search}
import org.infinispan.protostream.annotations.ProtoSchemaBuilder
import org.infinispan.query.dsl.Query
import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants
import org.infinispan.spark._
import org.infinispan.spark.rdd.InfinispanRDD
import org.infinispan.spark.stream.InfinispanInputDStream
import org.scalatest.{FunSpec, Matchers}

class InfinispanSparkConnectorSpec extends FunSpec with Matchers {
  describe("Infinispan Spartk Connector Spec") {
    // ここに、テストを書く!
  }
}

他に、いくつかヘルパーメソッドを定義します。

まずはRemoteCacheを使うためのメソッド。

  protected def withRemoteCache[K, V](cacheName: String, useProtoStream: Boolean = false)(f: RemoteCache[K, V] => Unit): Unit = {
    val configurationBuilder = new ConfigurationBuilder
    configurationBuilder.addServers("localhost:11222")

    if (useProtoStream) {
      configurationBuilder.marshaller(new ProtoStreamMarshaller)
    }

    val manager = new RemoteCacheManager(configurationBuilder.build)

    val cache = manager.getCache[K, V](cacheName)

    try {
      f(cache)
      cache.clear()
    } finally {
      cache.stop()
      manager.stop()
    }
  }

引数を変えると、ProtoStreamMarshallerを設定に加えるようになっています。この点は、後述。いつも必要なわけではありません。

あと、SparkContextを準備するメソッド。

  protected def withSpark(f: SparkContext => Unit): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Infinispan Spark Connector Test")
    val sc = new SparkContext(conf)

    try {
      f(sc)
    } finally {
      sc.stop()
    }
  }

これらを使って、テストコードを組んでいきます。

Infinispan Serverはダウンロードしてきておいて、あらかじめ起動しておきます。

$ infinispan-server-8.1.0.Final/bin/standalone.sh -c clustered.xml

ここまでで、準備完了。

あとは、こちらを見ながらコードを書いていきます。

Basic usage

InfinispanRDDを使う

まずは、初歩的な感じでRDDから。

こちらに沿って、コードを書きます。

Creating an RDD

こんな感じ。

    it("simple InfinispanRDD") {
      withRemoteCache[String, SimpleBook]("namedCache") { cache =>
        SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b))

        withSpark { sc =>
          val properties = new Properties
          properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222")
          properties.setProperty("infinispan.rdd.cacheName", cache.getName)

          val rdd = new InfinispanRDD[String, SimpleBook](sc, properties)

          rdd.values.map(_.price).sum.toInt should be(24171)
        }
      }
    }

ここで、SimpleBookとはシリアライズ可能なこんな定義です。
src/test/scala/org/littlewings/infinispan/spark/SimpleBook.scala

package org.littlewings.infinispan.spark

object SimpleBook {
  val sourceBooks: Seq[SimpleBook] =
    Array(
      SimpleBook("978-4798042169",
        "わかりやすいJavaEEウェブシステム入門",
        3456,
        "JavaEE7準拠。ショッピングサイトや業務システムで使われるJavaEE学習書の決定版!"),
      SimpleBook("978-4798124605",
        "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava",
        4410,
        "エンタープライズJava入門書の決定版!Java EE 6は、大規模な情報システム構築に用いられるエンタープライズ環境向けのプログラミング言語です。"),
      SimpleBook("978-4774127804",
        "Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築",
        3200,
        "Luceneは全文検索システムを構築するためのJavaのライブラリです。Luceneを使えば,一味違う高機能なWebアプリケーションを作ることができます。"),
      SimpleBook("978-4774161631",
        "[改訂新版] Apache Solr入門 オープンソース全文検索エンジン",
        3780,
        "最新版Apaceh Solr Ver.4.5.1に対応するため大幅な書き直しと原稿の追加を行い、現在の開発環境に合わせて完全にアップデートしました。Apache Solrは多様なプログラミング言語に対応した全文検索エンジンです。"),
      SimpleBook("978-4048662024",
        "高速スケーラブル検索エンジン ElasticSearch Server",
        3024,
        "Apache Solrを超える全文検索エンジンとして注目を集めるElasticSearch Serverの日本初の解説書です。多くのサンプルを用いた実践的入門書になっています。"),
      SimpleBook("978-1933988177",
        "Lucene in Action",
        6301,
        "New edition of top-selling book on the new version of Lucene. the coreopen-source technology behind most full-text search and Intelligent Web applications.")
    )

  def apply(isbn: String, title: String, price: Int, summary: String): SimpleBook = {
    val book = new SimpleBook
    book.isbn = isbn
    book.title = title
    book.price = price
    book.summary = summary
    book
  }
}

@SerialVersionUID(1L)
class SimpleBook extends Serializable {
  var isbn: String = _
  var title: String = _
  var price: Int = _
  var summary: String = _
}

InfinispanRDDは、以下のようにjava.util.PropertiesとSparkContextを使用して生成します。

          val properties = new Properties
          properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222")
          properties.setProperty("infinispan.rdd.cacheName", cache.getName)

          val rdd = new InfinispanRDD[String, SimpleBook](sc, properties)

設定項目は、こちら。

Supported Configurations

見たらだいたいわかりますが、Hot Rod関係の設定がほとんどです。

実際、内部でRemoteCacheManagerおよびRemoteCacheを生成しています。
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/InfinispanRDD.scala#L31

InfinispanRDDを得た後は、RDDとしてコードを書けばいいようです。

          val rdd = new InfinispanRDD[String, SimpleBook](sc, properties)

          rdd.values.map(_.price).sum.toInt should be(24171)

これで、最初に別のコードで設定したデータを、RDDを使って演算しています。

        SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b))

InfinispanInputDStreamを使う

続いては、DStreamです。

SparkStreamingを使用するので、ヘルパーメソッドを追加。

  protected def withStreaming(duration: Duration)(f: StreamingContext => Unit): Unit = {
    withSpark {
      sc =>
        val ssc = new StreamingContext(sc, duration)

        try {
          f(ssc)
        } finally {
          ssc.stop(true)
        }
    }
  }

こちらを参考に、
Creating a DStream

実装したコードはこんな感じ。

    it("use DStream") {
      withRemoteCache[String, SimpleBook]("namedCache") { cache =>
        withStreaming(Seconds(1)) { ssc =>
          val properties = new Properties
          properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222")
          properties.setProperty("infinispan.rdd.cacheName", cache.getName)

          val stream =
            new InfinispanInputDStream[String, SimpleBook](ssc, StorageLevel.MEMORY_ONLY, properties)

          stream.foreachRDD { rdd =>
            rdd.foreach(s => println(s"isbn: ${s._1}, title: ${s._2.title}, event: ${s._3}"))
          }

          ssc.start()

          TimeUnit.SECONDS.sleep(2)

          SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b))

          TimeUnit.SECONDS.sleep(2)

          // ssc.awaitTermination()
        }
      }
    }

最初に、InfinispanRDDを生成してから、InfinispanInputDStreamを生成します。

          val stream =
            new InfinispanInputDStream[String, SimpleBook](ssc, StorageLevel.MEMORY_ONLY, properties)

あとは、DStreamとして実装していけばよいです。

InfinispanInputDStreamの実装

なんとなく予想してはいましたが、InfinispanInputDStreamの実体はListenerになります。
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/stream/InfinispanInputDStream.scala#L37

このためかと思いますが、StreamingContextを開始後すぐにRemoteCacheにデータを入れても反応がありませんでした。Listenerがちゃんと登録されて動き出す前に、データを入れてしまったんではないかと…。

          ssc.start()

          TimeUnit.SECONDS.sleep(2)

          SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b))

          TimeUnit.SECONDS.sleep(2)

というわけで、開始後とりあえずスリープさせることにしました。

イベントの受信はStreamingContext生成時の時間で行われているように見えますが、一時的に保存してるのかな(StorageLevel.MEMORY_ONLY?)…このあたり、SparkのAPIをちゃんと見てないのでそのうち…。

          val stream =
            new InfinispanInputDStream[String, SimpleBook](ssc, StorageLevel.MEMORY_ONLY, properties)

FilteredInfinispanRDDを使う

クエリを使える、FilteredInfinispanRDD。RDDに対してクエリを適用し、フィルタリングされたRDDを取得してその後の処理を書くことができます。

Filtering by Query

で、ここでRemote Queryを使うことになります。なので、まずはインデキシングを有効にしたCacheを定義。

$ infinispan-server-8.1.0.Final/bin/ispn-cli.sh -c
[standalone@localhost:9990 /]

[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=indexingCache:add(start=EAGER,mode=SYNC,indexing=LOCAL,auto-config=true,indexing-properties={lucene_version=LUCENE_CURRENT})
{"outcome" => "success"}

[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=indexingCache:add(configuration=indexingCache)
{"outcome" => "success"}

最小設定のつもりです。この後、Infinispan Serverを再起動しておきます。

で、とりあえず普通にRDDとRemoteQueryを使おうとすると、実行に失敗します。

    it("Query InfinispanRDD, No ProtoBuffers") {
      withRemoteCache[String, SimpleBook]("namedCache") { cache =>
        SimpleBook.sourceBooks.foreach(b => cache.put(b.isbn, b))

        withSpark { sc =>
          val properties = new Properties
          properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222")
          properties.setProperty("infinispan.rdd.cacheName", cache.getName)

          val rdd = new InfinispanRDD[String, SimpleBook](sc, properties)

          val thrown =
            the[HotRodClientException] thrownBy
              Search
                .getQueryFactory(cache)
                .from(classOf[SimpleBook])
                .having("price")
                .gte(4000)
                .toBuilder
                .build

          thrown.getMessage should include("The cache manager must be configured with a ProtoStreamMarshaller")
        }
      }
    }

まあ、RemoteQueryを使うためには、他にも準備が必要なので…。

ここで、最初に用意しておいたRemoteCacheについてのヘルパーメソッドの引数を変えて

  protected def withRemoteCache[K, V](cacheName: String, useProtoStream: Boolean = false)(f: RemoteCache[K, V] => Unit): Unit = {
    val configurationBuilder = new ConfigurationBuilder
    configurationBuilder.addServers("localhost:11222")

    if (useProtoStream) {
      configurationBuilder.marshaller(new ProtoStreamMarshaller)
    }

    val manager = new RemoteCacheManager(configurationBuilder.build)

    val cache = manager.getCache[K, V](cacheName)

    try {
      f(cache)
      cache.clear()
    } finally {
      cache.stop()
      manager.stop()
    }
  }

ProtoStreamMarshallerが適用されるようにします。

    if (useProtoStream) {
      configurationBuilder.marshaller(new ProtoStreamMarshaller)
    }

Protocol BuffersのIDLも必要なのですが…

Supported Configurations

上記の「infinispan.rdd.query.proto.protofiles」と「infinispan.rdd.query.proto.marshallers」が該当します。

なのですが、Protofile(IDL)とMarshallerはアノテーションで自動生成することができます。

https://github.com/infinispan/infinispan/blob/8.1.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/marshall/ProtoStreamMarshallerWithAnnotationsTest.java
https://github.com/infinispan/infinispan/blob/8.1.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/query/RemoteQueryWithProtostreamAnnotationsTest.java#L37

で、今回実装したのはこちら。
src/test/scala/org/littlewings/infinispan/spark/Book.scala

package org.littlewings.infinispan.spark

import org.infinispan.protostream.annotations.{ProtoDoc, ProtoField, ProtoMessage}
import org.infinispan.protostream.descriptors.Type

object Book {
  val sourceBooks: Seq[Book] =
    Array(
      Book("978-4798042169",
        "わかりやすいJavaEEウェブシステム入門",
        3456,
        "JavaEE7準拠。ショッピングサイトや業務システムで使われるJavaEE学習書の決定版!"),
      Book("978-4798124605",
        "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava",
        4410,
        "エンタープライズJava入門書の決定版!Java EE 6は、大規模な情報システム構築に用いられるエンタープライズ環境向けのプログラミング言語です。"),
      Book("978-4774127804",
        "Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築",
        3200,
        "Luceneは全文検索システムを構築するためのJavaのライブラリです。Luceneを使えば,一味違う高機能なWebアプリケーションを作ることができます。"),
      Book("978-4774161631",
        "[改訂新版] Apache Solr入門 オープンソース全文検索エンジン",
        3780,
        "最新版Apaceh Solr Ver.4.5.1に対応するため大幅な書き直しと原稿の追加を行い、現在の開発環境に合わせて完全にアップデートしました。Apache Solrは多様なプログラミング言語に対応した全文検索エンジンです。"),
      Book("978-4048662024",
        "高速スケーラブル検索エンジン ElasticSearch Server",
        3024,
        "Apache Solrを超える全文検索エンジンとして注目を集めるElasticSearch Serverの日本初の解説書です。多くのサンプルを用いた実践的入門書になっています。"),
      Book("978-1933988177",
        "Lucene in Action",
        6301,
        "New edition of top-selling book on the new version of Lucene. the coreopen-source technology behind most full-text search and Intelligent Web applications.")
    )

  def apply(isbn: String, title: String, price: Int, summary: String): Book = {
    val book = new Book
    book.isbn = isbn
    book.title = title
    book.price = price
    book.summary = summary
    book
  }
}

@ProtoDoc("@Indexed")
@ProtoMessage(name = "Book")
class Book {
  var isbn: String = _
  var title: String = _
  var price: Int = _
  var summary: String = _

  @ProtoDoc("@IndexedField")
  @ProtoField(number = 1, name = "isbn", `type` = Type.STRING)
  def getIsbn: String = isbn

  def setIsbn(isbn: String): Unit = this.isbn = isbn

  @ProtoDoc("@IndexedField")
  @ProtoField(number = 2, name = "title", `type` = Type.STRING)
  def getTitle: String = title

  def setTitle(title: String): Unit = this.title = title

  @ProtoDoc("@IndexedField")
  @ProtoField(number = 3, name = "price", required = true, defaultValue = "0")
  def getPrice: Int = price

  def setPrice(price: Int): Unit = this.price = price

  @ProtoDoc("@IndexedField")
  @ProtoField(number = 4, name = "summary", `type` = Type.STRING)
  def getSummary: String = summary

  def setSummary(summary: String): Unit = this.summary = summary
}

Protocol Buffersと併用する場合は、Serializableである必要はありません。

現時点のIDLとMarshallerの自動生成機能では、主にプリミティブ系のフィールドの扱いにバグがあって、requiredを指定しないと値を設定してもデフォルト値で上書きされてしまいます。

  @ProtoDoc("@IndexedField")
  @ProtoField(number = 3, name = "price", required = true, defaultValue = "0")
  def getPrice: Int = price

これはProtoStreamにPull Requestしておいたので、次回のリリースできっと直ると思います…。

で、長くなりましたが最終的に書いたのはこんなコードです。

    it("Query InfinispanRDD") {
      withRemoteCache[String, Book]("indexingCache", true) { cache =>
        val manager = cache.getRemoteCacheManager

        val context = ProtoStreamMarshaller.getSerializationContext(manager)
        val protoSchemaBuilder = new ProtoSchemaBuilder
        val idl = protoSchemaBuilder
          .fileName(classOf[Book].getName)
          .addClass(classOf[Book])
          .build(context)

        val metaCache = manager.getCache[String, String](ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME)
        metaCache.put(classOf[Book].getName + ".proto", idl)

        Book.sourceBooks.foreach(b => cache.put(b.isbn, b))

        withSpark { sc =>
          val properties = new Properties
          properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222")
          properties.setProperty("infinispan.rdd.cacheName", cache.getName)

          val rdd = new InfinispanRDD[String, Book](sc, properties)

          val query: Query =
            Search
              .getQueryFactory(cache)
              .from(
                classOf[Book])
              .having("price")
              .gte(4000)
              .toBuilder
              .build

          val filteredRdd = rdd.filterByQuery[Book](query, classOf[Book])

          filteredRdd.values.map(_.price).sum.toInt should be(10711)
        }
      }
    }

ProtoStreamMarshallerは、あらかじめRemoteCacheManagerに適用しているものとします。

    if (useProtoStream) {
      configurationBuilder.marshaller(new ProtoStreamMarshaller)
    }

IDLを生成しつつ、Protocol Buffers用のメタデータCacheに登録。

        val context = ProtoStreamMarshaller.getSerializationContext(manager)
        val protoSchemaBuilder = new ProtoSchemaBuilder
        val idl = protoSchemaBuilder
          .fileName(classOf[Book].getName)
          .addClass(classOf[Book])
          .build(context)

        val metaCache = manager.getCache[String, String](ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME)
        metaCache.put(classOf[Book].getName + ".proto", idl)

あとは、RDDを生成した後に、クエリを適用してFilteredInfinispanRDDを取得後、RDDを使ったを行います。

        withSpark { sc =>
          val properties = new Properties
          properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222")
          properties.setProperty("infinispan.rdd.cacheName", cache.getName)

          val rdd = new InfinispanRDD[String, Book](sc, properties)

          val query: Query =
            Search
              .getQueryFactory(cache)
              .from(
                classOf[Book])
              .having("price")
              .gte(4000)
              .toBuilder
              .build

          val filteredRdd = rdd.filterByQuery[Book](query, classOf[Book])

          filteredRdd.values.map(_.price).sum.toInt should be(10711)
        }

ここで、Propertiesに「infinispan.rdd.query.proto.protofiles」と「infinispan.rdd.query.proto.marshallers」を指定せず、エントリの値となるEntityにProtoStreamのアノテーションを与えていた場合は、RDDが内部で使用するRemoteCacheManagerに対して、IDLとMarshallerが自動生成されて登録されます。
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/FilteredInfinispanRDD.scala#L31
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/FilteredInfinispanRDD.scala#L78

FilteredInfinispanRDDの実体

クエリを使うので、RemotQueryは絡んでいるのだろうとは思ったのですが、内部的にはInfinispanのIteratorで実現されています。

https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/FilteredInfinispanRDD.scala#L45
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/rdd/InfinispanRDD.scala#L54

クエリをサーバー側に送って、フィルターがかかった状態のIteratorを取得しているみたいですね。

RDDへの書き出し

最後は、RDDからInfinispanへの書き出しです。

Write arbitrary key/value RDDs to Infinispan

以下のimport文を入れることで、RDDへの暗黙的な変換が追加され、Infinispanへの書き出しができるようになります。

import org.infinispan.spark._

サンプルコードは、こんな感じ。

    it("write to Infinispan") {
      withRemoteCache[String, SimpleBook]("namedCache") { cache =>
        withSpark { sc =>
          val properties = new Properties
          properties.setProperty("infinispan.client.hotrod.server_list", "localhost:11222")
          properties.setProperty("infinispan.rdd.cacheName", cache.getName)

          val simpleRdd = sc.parallelize(SimpleBook.sourceBooks.map(b => (b.isbn, b)))
          simpleRdd.writeToInfinispan(properties)

          cache should have size(6L)
          cache.get("978-4048662024").title should be("高速スケーラブル検索エンジン ElasticSearch Server")
        }
      }
    }

RDDに対して、writeToInfinispanというメソッドを使用することができるようになります。

          val simpleRdd = sc.parallelize(SimpleBook.sourceBooks.map(b => (b.isbn, b)))
          simpleRdd.writeToInfinispan(properties)

なお、この時のRDDへの型パラメーターの適用は、RDD[(K, V)]となっている必要があり、writeToInfinispanメソッドにはInfinispanRDDを作成した時と同じjava.util.Propertiesのインスタンスを渡す必要があります。
https://github.com/infinispan/infinispan-spark/blob/v0.2/src/main/scala/org/infinispan/spark/package.scala#L21

まとめ

Infinispanが提供する、Apache Spark Connectorを使ってみました。

まだバージョン0.2なので、どこまで使えるかは未知数ですが個人的にはApache Sparkにも興味はあるので、ちょっと追っていこうかなと思っています。

なお、RemoteQueryを使う時のIDLの自動生成でrequiredでないプリミティブなフィールドの値が、デフォルト値で上書きされる問題に1番ハマりました…。Spark Connector関係ないですけど…。

RemoteQuery、だいたいいつも何かしら苦労するんですよね。習熟度不足でしょうか…。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-spark-connector