Infinispanは、キーとなるオブジェクトのハッシュ値で配置先のNodeを決定しますが、この挙動を変更するためのGrouping APIというものが存在します。
The Grouping API
https://docs.jboss.org/author/display/ISPN/The+Grouping+API
以前は、こういうAPIを許可してなかったみたいですね。
このAPIを使用すると、Infinispanはキーとなるオブジェクトのハッシュ値を無視して、グルーピングに使用する値のハッシュ値を使って配置先のNodeを決定するようになります。
Grouping APIが使用されている場合、全Nodeがキーの所有者となるNodeを計算できることが重要になります。このため、グループは手動で指定することができません。グループは、キーとなるクラスによって生成されたエントリそのものか、キークラスではなく外部関数によって生成された値によって決定されます。
というわけで、使ってみましょう。
準備
まずは、各種準備。
build.sbt
name := "infinispan-group-api" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.2" organization := "littlewings" fork in run := true resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/" libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "5.3.0.Final", "net.jcip" % "jcip-annotations" % "1.0" )
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd" xmlns="urn:infinispan:config:5.3"> <global> <transport clusterName="group-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> </global> <namedCache name="cacheAsCustomizedKeyClass"> <jmxStatistics enabled="true" /> <clustering mode="distribution"> <hash numOwners="2"> <groups enabled="true" /> </hash> </clustering> </namedCache> <namedCache name="cacheAsGrouper"> <jmxStatistics enabled="true" /> <clustering mode="distribution"> <hash numOwners="2"> <groups enabled="true"> <grouper class="MyGrouper" /> </groups> </hash> </clustering> </namedCache> <namedCache name="cacheAsGrouperSimple"> <jmxStatistics enabled="true" /> <clustering mode="distribution"> <hash numOwners="2"> <groups enabled="true"> <grouper class="SimpleGrouper" /> </groups> </hash> </clustering> </namedCache> </infinispan>
Cacheは、用途に応じて3つ用意しました。
JGroupsの設定は、端折ります。
お馴染み、浮いててもらうためのサーバ。
src/main/scala/EmbeddedCacheServer.scala
import org.infinispan.manager.DefaultCacheManager object EmbeddedCacheServer { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cacheAsCustomizedKeyClass = manager.getCache[Any, Any]("cacheAsCustomizedKeyClass") val cacheAsGrouper = manager.getCache[NoGroupKeyClass, NoGroupKeyClass]("cacheAsGrouper") val cacheAsGrouperSimple = manager.getCache[String, String]("cacheAsGrouperSimple") } }
メインとなるプログラム。
src/main/scala/InfinispanGroupApi.scala
import java.util.Objects import org.infinispan.distribution.group.{Group, Grouper} import org.infinispan.manager.DefaultCacheManager object InfinispanGroupApi { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cacheAsCustomizedKeyClass = manager.getCache[KeyClass, KeyClass]("cacheAsCustomizedKeyClass") val cacheAsGrouper = manager.getCache[NoGroupKeyClass, NoGroupKeyClass]("cacheAsGrouper") val cacheAsGrouperSimple = manager.getCache[String, String]("cacheAsGrouperSimple") try { /** キー、値の登録と分散状態の表示 **/ } finally { cacheAsCustomizedKeyClass.stop() cacheAsGrouper.stop() cacheAsGrouperSimple.stop() manager.stop() } } }
あとは、それぞれの使い方を見ていきましょう。
キークラス自身にグループを決定させる
キークラス自身に、グループの判断基準となる情報を生成させる方法です。キークラスを自分で作成できる場合などは、こちらを使います。
Infinispanの設定としては、
<namedCache name="cacheAsCustomizedKeyClass"> <jmxStatistics enabled="true" /> <clustering mode="distribution"> <hash numOwners="2"> <groups enabled="true" /> </hash> </clustering> </namedCache>
のところで、
<groups enabled="true" />
と書きます。
続いて、キークラス側ではキーを返却するメソッドにGroupアノテーションを付与します。今回は、コード、名前、年齢というなんか人っぽいクラスを作成しましたが、使っているうちに使用方法がテキトーになっていきました…。
@SerialVersionUID(1L) class KeyClass(val code: String, val name: String, val age: Int) extends Serializable { @Group def getCode: String = code override def equals(other: Any): Boolean = other match { case o: KeyClass => code == o.code && name == o.name && age == o.age case _ => false } override def hashCode: Int = Objects.hashCode(code, name, age) override def toString: String = s"KeyClass => code:[$code], name:[$name], age:[$age]" } object KeyClass { def apply(code: String, name: String, age: Int): KeyClass = new KeyClass(code, name, age) }
Groupアノテーションを付与しているのは、ここですね。codeをグルーピングのキーとしています。
class KeyClass(val code: String, val name: String, val age: Int) extends Serializable { @Group def getCode: String = code
なお、グループの値はStringである必要がありますが、Groupアノテーションを付与する対象はメソッドであればよく、getterである必要はありません。
では、キーと値をCacheに放り込んでみます。
val groupKeyValues = Array(KeyClass("10000", "Sato", 20), KeyClass("10000", "Tanaka", 25), KeyClass("10000", "Suzuki", 30), KeyClass("20000", "Momimoto", 22), KeyClass("20000", "Hanada", 27), KeyClass("20000", "Yamamoto", 19), KeyClass("30000", "Ken", 22), KeyClass("30000", "Mike", 23), KeyClass("30000", "Jusmine", 21), KeyClass("40000", "hoge", 20), KeyClass("40000", "foo", 20), KeyClass("40000", "bar", 20), KeyClass("50000", "Java", 18), KeyClass("50000", "Scala", 10), KeyClass("50000", "Clojure", 6) ) for (keyValue <- groupKeyValues) cacheAsCustomizedKeyClass.put(keyValue, keyValue) for (keyValue <- groupKeyValues) { val dm = cacheAsCustomizedKeyClass.getAdvancedCache.getDistributionManager println(s"PrimaryLocation: ${dm.getPrimaryLocation(keyValue)}, Locate:${dm.locate(keyValue)}") println(s" $keyValue") }
今回は、キーと値は同じクラスにしました。また、最後にキーの分布状態を出力しています。
では、浮いててもらうサーバを2本ほど立ち上げ
# No.1 $ sbt "run-main EmbeddedCacheServer" # No.2 $ sbt "run-main EmbeddedCacheServer"
実行。
> run-main InfinispanGroupApi [info] Running InfinispanGroupApi 〜省略〜 [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] KeyClass => code:[10000], name:[Sato], age:[20] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] KeyClass => code:[10000], name:[Tanaka], age:[25] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] KeyClass => code:[10000], name:[Suzuki], age:[30] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] KeyClass => code:[20000], name:[Momimoto], age:[22] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] KeyClass => code:[20000], name:[Hanada], age:[27] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] KeyClass => code:[20000], name:[Yamamoto], age:[19] [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-9777] [info] KeyClass => code:[30000], name:[Ken], age:[22] [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-9777] [info] KeyClass => code:[30000], name:[Mike], age:[23] [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-9777] [info] KeyClass => code:[30000], name:[Jusmine], age:[21] [info] PrimaryLocation: ubuntu-9777, Locate:[ubuntu-9777, ubuntu-35472] [info] KeyClass => code:[40000], name:[hoge], age:[20] [info] PrimaryLocation: ubuntu-9777, Locate:[ubuntu-9777, ubuntu-35472] [info] KeyClass => code:[40000], name:[foo], age:[20] [info] PrimaryLocation: ubuntu-9777, Locate:[ubuntu-9777, ubuntu-35472] [info] KeyClass => code:[40000], name:[bar], age:[20] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] KeyClass => code:[50000], name:[Java], age:[18] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] KeyClass => code:[50000], name:[Scala], age:[10] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] KeyClass => code:[50000], name:[Clojure], age:[6]
今回は、codeごとにグループ化したので、同じcodeをモツものはPrimaryLocationが同じNodeになっているのがわかると思います。
外部のクラスにグループを指定させる
今度は、キーとなるクラス以外のクラスに、グループを決定させる方法です。ライブラリのクラスを使用するなどの理由で、キークラスを自分で制御できない場合に使用します。
まずは、キーとなるクラスを用意します。
@SerialVersionUID(1L) class NoGroupKeyClass(val code: String, val name: String, val age: Int) extends Serializable { override def equals(other: Any): Boolean = other match { case o: NoGroupKeyClass => code == o.code && name == o.name && age == o.age case _ => false } override def hashCode: Int = Objects.hashCode(code, name, age) override def toString: String = s"NoGroupKey => code:[$code], name:[$name], age:[$age]" } object NoGroupKeyClass { def apply(code: String, name: String, age: Int): NoGroupKeyClass = new NoGroupKeyClass(code, name, age) }
先のキークラスの例から、Groupアノテーションを外しただけですね。
そして、グループを決定させるクラスとして、Grouperインターフェースを実装したクラスを用意します。Grouperは、配置先が計算される前に、Interceptorとして動作するようです。
class MyGrouper extends Grouper[NoGroupKeyClass] { def computeGroup(key: NoGroupKeyClass, group: String): String = key.code def getKeyType: Class[NoGroupKeyClass] = classOf[NoGroupKeyClass] }
この時の型パラメータは、キーとなるクラスとなります。そして、computeGroupメソッドで、グループを決定するための値を返却します。ここでは、codeを返却しています。
そして、このGrouperインターフェースを実装したクラスを、Infinispanの定義ファイルに登録します。
<namedCache name="cacheAsGrouper"> <jmxStatistics enabled="true" /> <clustering mode="distribution"> <hash numOwners="2"> <groups enabled="true"> <grouper class="MyGrouper" /> </groups> </hash> </clustering> </namedCache>
groupsを有効にした上で、grouperを加えます。
<groups enabled="true"> <grouper class="MyGrouper" /> </groups>
では、先ほどと同じようにデータを用意します。今回も、キーと値は同じものを使用します。
val keyValues = Array(NoGroupKeyClass("10000", "Sato", 20), NoGroupKeyClass("10000", "Tanaka", 25), NoGroupKeyClass("10000", "Suzuki", 30), NoGroupKeyClass("20000", "Momimoto", 22), NoGroupKeyClass("20000", "Hanada", 27), NoGroupKeyClass("20000", "Yamamoto", 19), NoGroupKeyClass("30000", "Ken", 22), NoGroupKeyClass("30000", "Mike", 23), NoGroupKeyClass("30000", "Jusmine", 21), NoGroupKeyClass("40000", "hoge", 20), NoGroupKeyClass("40000", "foo", 20), NoGroupKeyClass("40000", "bar", 20), NoGroupKeyClass("50000", "Java", 18), NoGroupKeyClass("50000", "Scala", 10), NoGroupKeyClass("50000", "Clojure", 6) ) for (keyValue <- keyValues) cacheAsGrouper.put(keyValue, keyValue) for (keyValue <- keyValues) { val dm = cacheAsGrouper.getAdvancedCache.getDistributionManager println(s"PrimaryLocation: ${dm.getPrimaryLocation(keyValue)}, Locate:${dm.locate(keyValue)}") println(s" $keyValue") }
実行結果は、こちらです。
[info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] NoGroupKey => code:[10000], name:[Sato], age:[20] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] NoGroupKey => code:[10000], name:[Tanaka], age:[25] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] NoGroupKey => code:[10000], name:[Suzuki], age:[30] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] NoGroupKey => code:[20000], name:[Momimoto], age:[22] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] NoGroupKey => code:[20000], name:[Hanada], age:[27] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] NoGroupKey => code:[20000], name:[Yamamoto], age:[19] [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-9777] [info] NoGroupKey => code:[30000], name:[Ken], age:[22] [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-9777] [info] NoGroupKey => code:[30000], name:[Mike], age:[23] [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-9777] [info] NoGroupKey => code:[30000], name:[Jusmine], age:[21] [info] PrimaryLocation: ubuntu-9777, Locate:[ubuntu-9777, ubuntu-35472] [info] NoGroupKey => code:[40000], name:[hoge], age:[20] [info] PrimaryLocation: ubuntu-9777, Locate:[ubuntu-9777, ubuntu-35472] [info] NoGroupKey => code:[40000], name:[foo], age:[20] [info] PrimaryLocation: ubuntu-9777, Locate:[ubuntu-9777, ubuntu-35472] [info] NoGroupKey => code:[40000], name:[bar], age:[20] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] NoGroupKey => code:[50000], name:[Java], age:[18] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] NoGroupKey => code:[50000], name:[Scala], age:[10] [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-9777] [info] NoGroupKey => code:[50000], name:[Clojure], age:[6]
先ほどの、キークラス自身にグループを決定させた時と、同じ分散結果になっています。
*これは、先ほどの実行結果の続きを反映したものです
なお、この例ではキークラスを用意しましたが、別にStringなどでもかまいません。
例示しておきましょう。
class SimpleGrouper extends Grouper[String] { def computeGroup(key: String, group: String): String = key.head.toString def getKeyType: Class[String] = classOf[String] }
ここでは、キーとなるStringの先頭1文字でグルーピングします。
設定ファイル。
<namedCache name="cacheAsGrouperSimple"> <jmxStatistics enabled="true" /> <clustering mode="distribution"> <hash numOwners="2"> <groups enabled="true"> <grouper class="SimpleGrouper" /> </groups> </hash> </clustering> </namedCache>
サンプルコードと
val simpleKeyValues = Array(("10001", "Sato"), ("10002", "Tanaka"), ("10003", "Suzuki"), ("20001", "Momimoto"), ("20002", "Hanada"), ("20003", "Yamamoto"), ("30001", "Ken"), ("30002", "Mike"), ("30003", "Jusmine"), ("40001", "hoge"), ("40002", "foo"), ("40003", "bar"), ("50001", "Java"), ("50002", "Scala"), ("50003", "Clojure") ) for ((key, value) <- simpleKeyValues) cacheAsGrouperSimple.put(key, value) for ((key, value) <- simpleKeyValues) { val dm = cacheAsGrouperSimple.getAdvancedCache.getDistributionManager println(s"PrimaryLocation: ${dm.getPrimaryLocation(key)}, Locate:${dm.locate(key)}") println(s" $key:$value") }
実行結果。
[info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-35472] [info] 10001:Sato [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-35472] [info] 10002:Tanaka [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-35472] [info] 10003:Suzuki [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-41933] [info] 20001:Momimoto [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-41933] [info] 20002:Hanada [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-41933] [info] 20003:Yamamoto [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-35472] [info] 30001:Ken [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-35472] [info] 30002:Mike [info] PrimaryLocation: ubuntu-41933, Locate:[ubuntu-41933, ubuntu-35472] [info] 30003:Jusmine [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-9777] [info] 40001:hoge [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-9777] [info] 40002:foo [info] PrimaryLocation: ubuntu-35472, Locate:[ubuntu-35472, ubuntu-9777] [info] 40003:bar [info] PrimaryLocation: ubuntu-9777, Locate:[ubuntu-9777, ubuntu-35472] [info] 50001:Java [info] PrimaryLocation: ubuntu-9777, Locate:[ubuntu-9777, ubuntu-35472] [info] 50002:Scala [info] PrimaryLocation: ubuntu-9777, Locate:[ubuntu-9777, ubuntu-35472] [info] 50003:Clojure
どのNodeに配置されるかを制御することはできないですが、DistributionManagerからグループがどこに配置されているかはわかるようになるので、知っておくと運用時などに便利なのかな?
src/main/scala/InfinispanGroupApi.scala
import java.util.Objects import org.infinispan.distribution.group.{Group, Grouper} import org.infinispan.manager.DefaultCacheManager object InfinispanGroupApi { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cacheAsCustomizedKeyClass = manager.getCache[KeyClass, KeyClass]("cacheAsCustomizedKeyClass") val cacheAsGrouper = manager.getCache[NoGroupKeyClass, NoGroupKeyClass]("cacheAsGrouper") val cacheAsGrouperSimple = manager.getCache[String, String]("cacheAsGrouperSimple") try { val groupKeyValues = Array(KeyClass("10000", "Sato", 20), KeyClass("10000", "Tanaka", 25), KeyClass("10000", "Suzuki", 30), KeyClass("20000", "Momimoto", 22), KeyClass("20000", "Hanada", 27), KeyClass("20000", "Yamamoto", 19), KeyClass("30000", "Ken", 22), KeyClass("30000", "Mike", 23), KeyClass("30000", "Jusmine", 21), KeyClass("40000", "hoge", 20), KeyClass("40000", "foo", 20), KeyClass("40000", "bar", 20), KeyClass("50000", "Java", 18), KeyClass("50000", "Scala", 10), KeyClass("50000", "Clojure", 6) ) for (keyValue <- groupKeyValues) cacheAsCustomizedKeyClass.put(keyValue, keyValue) for (keyValue <- groupKeyValues) { val dm = cacheAsCustomizedKeyClass.getAdvancedCache.getDistributionManager println(s"PrimaryLocation: ${dm.getPrimaryLocation(keyValue)}, Locate:${dm.locate(keyValue)}") println(s" $keyValue") } println("====================") val keyValues = Array(NoGroupKeyClass("10000", "Sato", 20), NoGroupKeyClass("10000", "Tanaka", 25), NoGroupKeyClass("10000", "Suzuki", 30), NoGroupKeyClass("20000", "Momimoto", 22), NoGroupKeyClass("20000", "Hanada", 27), NoGroupKeyClass("20000", "Yamamoto", 19), NoGroupKeyClass("30000", "Ken", 22), NoGroupKeyClass("30000", "Mike", 23), NoGroupKeyClass("30000", "Jusmine", 21), NoGroupKeyClass("40000", "hoge", 20), NoGroupKeyClass("40000", "foo", 20), NoGroupKeyClass("40000", "bar", 20), NoGroupKeyClass("50000", "Java", 18), NoGroupKeyClass("50000", "Scala", 10), NoGroupKeyClass("50000", "Clojure", 6) ) for (keyValue <- keyValues) cacheAsGrouper.put(keyValue, keyValue) for (keyValue <- keyValues) { val dm = cacheAsGrouper.getAdvancedCache.getDistributionManager println(s"PrimaryLocation: ${dm.getPrimaryLocation(keyValue)}, Locate:${dm.locate(keyValue)}") println(s" $keyValue") } println("====================") val simpleKeyValues = Array(("10001", "Sato"), ("10002", "Tanaka"), ("10003", "Suzuki"), ("20001", "Momimoto"), ("20002", "Hanada"), ("20003", "Yamamoto"), ("30001", "Ken"), ("30002", "Mike"), ("30003", "Jusmine"), ("40001", "hoge"), ("40002", "foo"), ("40003", "bar"), ("50001", "Java"), ("50002", "Scala"), ("50003", "Clojure") ) for ((key, value) <- simpleKeyValues) cacheAsGrouperSimple.put(key, value) for ((key, value) <- simpleKeyValues) { val dm = cacheAsGrouperSimple.getAdvancedCache.getDistributionManager println(s"PrimaryLocation: ${dm.getPrimaryLocation(key)}, Locate:${dm.locate(key)}") println(s" $key:$value") } } finally { cacheAsCustomizedKeyClass.stop() cacheAsGrouper.stop() cacheAsGrouperSimple.stop() manager.stop() } } } /** 自分でGroup制御を行うキークラス **/ object KeyClass { def apply(code: String, name: String, age: Int): KeyClass = new KeyClass(code, name, age) } @SerialVersionUID(1L) class KeyClass(val code: String, val name: String, val age: Int) extends Serializable { @Group def getCode: String = code override def equals(other: Any): Boolean = other match { case o: KeyClass => code == o.code && name == o.name && age == o.age case _ => false } override def hashCode: Int = Objects.hashCode(code, name, age) override def toString: String = s"KeyClass => code:[$code], name:[$name], age:[$age]" } /** 外部のGrouperでGroup制御を行うキークラス **/ object NoGroupKeyClass { def apply(code: String, name: String, age: Int): NoGroupKeyClass = new NoGroupKeyClass(code, name, age) } @SerialVersionUID(1L) class NoGroupKeyClass(val code: String, val name: String, val age: Int) extends Serializable { override def equals(other: Any): Boolean = other match { case o: NoGroupKeyClass => code == o.code && name == o.name && age == o.age case _ => false } override def hashCode: Int = Objects.hashCode(code, name, age) override def toString: String = s"NoGroupKey => code:[$code], name:[$name], age:[$age]" } class MyGrouper extends Grouper[NoGroupKeyClass] { def computeGroup(key: NoGroupKeyClass, group: String): String = key.code def getKeyType: Class[NoGroupKeyClass] = classOf[NoGroupKeyClass] } class SimpleGrouper extends Grouper[String] { def computeGroup(key: String, group: String): String = key.head.toString def getKeyType: Class[String] = classOf[String] }