CLOVER🍀

That was when it all began.

複数ロウを返すThrift APIを使ってみる

以下のエントリの続きです。
http://d.hatena.ne.jp/Kazuhira/20130428/1367138418

今回は、複数のロウを取得するAPIを使用していこうと思います。

対象とする、キースペースとカラムファミリの作成コマンドは、以下になります。

[default@unknown] create keyspace Room;
7b6e7ec9-d6ef-3d5e-9e54-53784a03e59b
[default@unknown] use Room;
Authenticated to keyspace: Room
[default@Room] create column family Users
...	with key_validation_class = 'UTF8Type'
...	and comparator = 'UTF8Type'
...	and default_validation_class = 'UTF8Type';
98bb2038-8b70-3a06-b77f-2d148941fee2

また、ExpandMetaClassを使用した拡張も、そのまま使用します。

String.metaClass {
    asBinary << { getBytes(StandardCharsets.UTF_8) }
    asBuffer << { ByteBuffer.wrap(asBinary()) }
}

Cassandra.Client.metaClass {
    'static' {
        openWith << { conn, cls ->
            def transport = new TFramedTransport(new TSocket(conn['host'], conn['port']))
            def protocol = new TBinaryProtocol(transport)
            def client = new Cassandra.Client(protocol)

            transport.open()

            try {
                client.set_keyspace(conn['keyspace'])
                cls(client)
            } finally {
                transport.close()
            }
        }
    }
}

Column.metaClass.static.create << { name, value, timestamp ->
    def column = new Column(name.toString().asBuffer())
    column.setValue(value.toString().asBuffer())
    column.timestamp = timestamp
    column
}

Mutation.metaClass {
    'static' {
        columnsMap << { key, columnFamilyName, columns ->
            def mutations = []
            for (column in columns) {
                def colOrSuper = new ColumnOrSuperColumn().setColumn(column)
                def mutation = new Mutation()
                mutation.column_or_supercolumn = colOrSuper
                mutations << mutation
            }
            def map = [: ]
            def mutationMap = [: ]
            mutationMap[columnFamilyName] = mutations
            map[key.asBuffer()] = mutationMap
            map
        }
    }
}

そして、ここからのコードは、以下の箇所を埋めるものとし

Cassandra.Client.openWith([host: 'localhost',
                           port: 9160,
                           keyspace: 'Room']) { client ->
    def columnFamilyName = 'Users'
    def columnParent = new ColumnParent(columnFamilyName)
    def timestamp = System.currentTimeMillis() * 1000

    // ここ!!
}

以下のimport文が書かれているものとします。

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

@Grab('org.apache.cassandra:cassandra-all:1.2.4')
import org.apache.cassandra.thrift.Cassandra
import org.apache.cassandra.thrift.Column
import org.apache.cassandra.thrift.ColumnOrSuperColumn
import org.apache.cassandra.thrift.ColumnParent
import org.apache.cassandra.thrift.ColumnPath
import org.apache.cassandra.thrift.ConsistencyLevel
import org.apache.cassandra.thrift.Deletion
import org.apache.cassandra.thrift.IndexClause
import org.apache.cassandra.thrift.IndexExpression
import org.apache.cassandra.thrift.IndexOperator
import org.apache.cassandra.thrift.KeyRange
import org.apache.cassandra.thrift.KeySlice
import org.apache.cassandra.thrift.Mutation
import org.apache.cassandra.thrift.SlicePredicate
import org.apache.cassandra.thrift.SliceRange

import org.apache.thrift.transport.TTransport
import org.apache.thrift.transport.TFramedTransport
import org.apache.thrift.transport.TSocket
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.protocol.TBinaryProtocol

とりあえず、データ登録。

    // データ登録
    client.batch_mutate(Mutation.columnsMap('1',
                                            columnFamilyName,
                                            [Column.create('name', 'Suzuki Taro', timestamp),
                                             Column.create('age', 20, timestamp),
                                             Column.create('occupation', 'System Engineer', timestamp)]),
                        ConsistencyLevel.ALL)

    client.batch_mutate(Mutation.columnsMap('2',
                                            columnFamilyName,
                                            [Column.create('name', 'Tanaka Jiro', timestamp),
                                             Column.create('age', 22, timestamp),
                                             Column.create('occupation', 'Programmer', timestamp)]),
                        ConsistencyLevel.ALL)

    client.batch_mutate(Mutation.columnsMap('3',
                                            columnFamilyName,
                                            [Column.create('name', 'Nakata Saburo', timestamp),
                                             Column.create('age', 27, timestamp),
                                             Column.create('occupation', 'Sales Engineer', timestamp)]),
                        ConsistencyLevel.ALL)

こんなデータが入っています。

[default@Room] list Users;
Using default limit of 100
Using default column limit of 100
-------------------
RowKey: 3
=> (column=age, value=27, timestamp=1367680331949000)
=> (column=name, value=Nakata Saburo, timestamp=1367680331949000)
=> (column=occupation, value=Sales Engineer, timestamp=1367680331949000)
-------------------
RowKey: 2
=> (column=age, value=22, timestamp=1367680331949000)
=> (column=name, value=Tanaka Jiro, timestamp=1367680331949000)
=> (column=occupation, value=Programmer, timestamp=1367680331949000)
-------------------
RowKey: 1
=> (column=age, value=20, timestamp=1367680331949000)
=> (column=name, value=Suzuki Taro, timestamp=1367680331949000)
=> (column=occupation, value=System Engineer, timestamp=1367680331949000)

3 Rows Returned.
Elapsed time: 18 msec(s).

では、いってみましょう。

Cassanra.Client#multiget_slice

ロウキーを複数指定して、対応するそれぞれのロウを取得するメソッドです。

まずはSlicePredicateを作成します。

    def multiGetSlicePredicate = new SlicePredicate()
    multiGetSlicePredicate.slice_range = new SliceRange(ByteBuffer.wrap(new byte[0]),
                                                        ByteBuffer.wrap(new byte[0]),
                                                        false,
                                                        100)

SliceRangeの意味は、これの元となったエントリを参照してくださいな。

そして、ロウキーのByteBufferのList、ColumnParent、SlicePredicate、ConsistencyLevelを渡してmultiget_sliceメソッドを呼び出します。

    def multiGetSliceMap =
        client.multiget_slice(['1'.asBuffer(), '2'.asBuffer(), '3'.asBuffer()],
                              columnParent,
                              multiGetSlicePredicate,
                              ConsistencyLevel.ONE)

multiget_sliceメソッドのシグニチャ

public java.util.Map<java.nio.ByteBuffer,java.util.List<ColumnOrSuperColumn>> multiget_slice(java.util.List<java.nio.ByteBuffer> keys,
                                                                                             ColumnParent column_parent,
                                                                                             SlicePredicate predicate,
                                                                                             ConsistencyLevel consistency_level)
                                                                                      throws InvalidRequestException,
                                                                                             UnavailableException,
                                                                                             TimedOutException,
                                                                                             org.apache.thrift.TException

となっていて、戻り値はByteBufferをキー、ColumnOrSuperColumnのListを値にしたMapです。

ということは、ロウキーと対応するカラムのListがペアになっているんだろうと思いましたが、これが大間違い。

キーとなるByteBufferには、実はロウキーもカラムもすべての値が入っているので、普通にStringに戻そうとするとおかしな結果になります。

Mapの中を見てみると、一応キーはカラムのListごとに異なるといった形になっていて、positionとlimitが異なるByteBufferが入っていました。それが、値に対応するロウキーの位置を指しているんだろうと。

というわけで、こんなコードを書いてみました。

    multiGetSliceMap.each { buffer, columnOrSuperColumns ->
        def start = buffer.position()
        def limit = buffer.limit()
        def bytes = new byte[limit - start]
        (start..<limit).eachWithIndex { c, i -> bytes[i] = buffer.get(c) }

        def key = new String(bytes, StandardCharsets.UTF_8)
        println("key => $key")

        columnOrSuperColumns.each { columnOrSuperColumn ->
            def column = columnOrSuperColumn.column
            println('get multi column: [name: value] => [' +
                        new String(column.name, StandardCharsets.UTF_8) + ': ' +
                        new String(column.value, StandardCharsets.UTF_8) + ']')
        }
    }

では、実行。

key => 2
get multi column: [name: value] => [age: 22]
get multi column: [name: value] => [name: Tanaka Jiro]
get multi column: [name: value] => [occupation: Programmer]
key => 1
get multi column: [name: value] => [age: 20]
get multi column: [name: value] => [name: Suzuki Taro]
get multi column: [name: value] => [occupation: System Engineer]
key => 3
get multi column: [name: value] => [age: 27]
get multi column: [name: value] => [name: Nakata Saburo]
get multi column: [name: value] => [occupation: Sales Engineer]

ロウキーの順番は、決まってないのかな?

Cassandra.Client#multiget_count

使い方としては、multiget_sliceとほぼ同じです。違うのは、戻り値がロウキーとカラム数だということです。

    // multiget_countを使って、複数ロウのカラム数を取得
    def multiGetCountPredicate = new SlicePredicate()
    multiGetCountPredicate.slice_range = new SliceRange(ByteBuffer.wrap(new byte[0]),
                                                        ByteBuffer.wrap(new byte[0]),
                                                        false,
                                                        100)

    def multiGetCountMap =
        client.multiget_count(['1'.asBuffer(), '2'.asBuffer(), '3'.asBuffer()],
                              columnParent,
                              multiGetCountPredicate,
                              ConsistencyLevel.ONE)

    multiGetCountMap.each { buffer, count ->
        def start = buffer.position()
        def limit = buffer.limit()
        def bytes = new byte[limit - start]
        (start..<limit).eachWithIndex { c, i -> bytes[i] = buffer.get(c) }

        def key = new String(bytes, StandardCharsets.UTF_8)
        println("key: count => $key: $count")
    }

最後の結果出力の部分以外は、ほぼ同じコードです。

実行結果。

key: count => 2: 3
key: count => 1: 3
key: count => 3: 3

Cassandra.Client#get_range_slices

キーの範囲指定をして、複数ロウを取得するメソッドです。

ここでは、KeyRangeを使用してロウキーの範囲を指定します。

    def rangeSliceKeyRange = new KeyRange()
    // 全キー指定
    rangeSliceKeyRange.start_key = new byte[0]
    rangeSliceKeyRange.end_key = new byte[0]

長さ0のbyte配列を開始と終了に指定すると、全キーを指定したことになるのは、カラムの名前指定と同じですね。

ちなみに、実際に範囲指定を行う場合は、Partitionerに順序のあるもの(Ordered Partitioner)を指定する必要があります。

デフォルトのRandomPartitionerでキーの範囲を指定使用とすると、例外が飛んできます。

    // これは、以下のような結果になる
    // InvalidRequestException(why:start key's token sorts after end key's token.  this is not allowed; you probably should not specify end key at all except with an ordered partitioner)
    //rangeSliceKeyRange.start_key = '1'.asBinary()
    //rangeSliceKeyRange.end_key = '2'.asBinary()

そして、ColumnParent、SlicePredicate、KeyRange、ConsistencyLevelを指定して、get_range_slicesメソッドを呼び出します。

    def getRangeKeySlices =
        client.get_range_slices(columnParent,
                                rangeSlicesPredicate,
                                rangeSliceKeyRange,
                                ConsistencyLevel.ALL)

戻り値はKeySliceのListになります。

    getRangeKeySlices.each { keySlice ->
        def key = new String(keySlice.key, StandardCharsets.UTF_8)
        println("key => $key")

        keySlice.columns.each { columnOrSuperColumn ->
            def column = columnOrSuperColumn.column
            println('get range slices column: [name: value] => [' +
                        new String(column.name, StandardCharsets.UTF_8) + ': ' +
                        new String(column.value, StandardCharsets.UTF_8) + ']')
        }
    }

KeySlice#getKeyやKeySlice#getColumnsでロウキーやカラムのListが取得できるので、あとは他の例と同じように。

実行結果。

key => 3
get range slices column: [name: value] => [age: 27]
get range slices column: [name: value] => [name: Nakata Saburo]
get range slices column: [name: value] => [occupation: Sales Engineer]
key => 2
get range slices column: [name: value] => [age: 22]
get range slices column: [name: value] => [name: Tanaka Jiro]
get range slices column: [name: value] => [occupation: Programmer]
key => 1
get range slices column: [name: value] => [age: 20]
get range slices column: [name: value] => [name: Suzuki Taro]
get range slices column: [name: value] => [occupation: System Engineer]

Cassandra.Client#get_paged_slice

get_range_slicesメソッドの、簡易版的な?

KeyRangeを使用するところは、同じです。

    def pagedKeyRange = new KeyRange()
    pagedKeyRange.start_key = new byte[0]
    pagedKeyRange.end_key = new byte[0]

その後は、カラムファミリ名、KeyRange、ByteBuffer(開始カラム)、ConsistencyLevelを指定して、メソッドを呼び出します。

    def getPagedKeySlices =
        client.get_paged_slice(columnFamilyName,
                               pagedKeyRange,
                               ByteBuffer.wrap(new byte[0]),  // start_columnを指定
                               ConsistencyLevel.ALL)
    // start_columnの指定は、最初の取得結果1件目にしか効果がない??

返ってくる値のフォーマットは、get_range_slicesと同じです。

    getPagedKeySlices.each { keySlice ->
        def key = new String(keySlice.key, StandardCharsets.UTF_8)
        println("key => $key")

        keySlice.columns.each { columnOrSuperColumn ->
            def column = columnOrSuperColumn.column
            println('get paged slice column: [name: value] => [' +
                        new String(column.name, StandardCharsets.UTF_8) + ': ' +
                        new String(column.value, StandardCharsets.UTF_8) + ']')
        }
    }

実行結果は割愛します。

ところで、get_paged_sliceでは開始カラムが指定できますが、試した感じだと最初の1件目のロウは確かに開始カラム以降のカラムが返却されます。ただ、2件目からは全カラムが返却されてしまいました…。よくわかりませんね?

Cassandra.Client#get_indexed_slices

これは、他のメソッドと少し趣向が異なります。ロウキーは使用せずに、セカンダリインデックスを使用するものです。

casandra-cliでいえば、こういう書き方になります。

[default@Room] get Users where age = 27;
[default@Room] get Users where occupation = 'Programmer';
[default@Room] get Users where occupation = 'System Engineer' and age >= 20;

これを使用するには、セカンダリインデックスが必要です。ただのカラムに実行すると

[default@Room] get Users where age = 27;
No indexed columns present in index clause with operator EQ

と怒られます。

get_indexed_slicesメソッドを使用しても、同様に怒られます。

Caught: InvalidRequestException(why:No indexed columns present in index clause with operator EQ)
InvalidRequestException(why:No indexed columns present in index clause with operator EQ)

というか、メッセージ同じですよね。

というわけで、まずはセカンダリインデックスを作成します。

Getting started using the Cassandra CLI
http://www.datastax.com/docs/1.2/cql_cli/using_cli

上記の「Indexing a column」を参考に、セカンダリインデックスを作成。

[default@Room] update column family Users
...	with column_metadata = 
...	[{column_name: age,
...	validation_class: UTF8Type,
...	index_type: KEYS},
...	{column_name: occupation,
...	validation_class: UTF8Type,
...	index_type: KEYS}];
e49f6fec-32b5-30ea-9419-21b88cce26d8

describeすると、「Built indexes」と「Column Metadata」の項目が変更、もしくは追加されます。

    ColumnFamily: Users
      Key Validation Class: org.apache.cassandra.db.marshal.UTF8Type
      Default column value validator: org.apache.cassandra.db.marshal.UTF8Type
      Columns sorted by: org.apache.cassandra.db.marshal.UTF8Type
      GC grace seconds: 864000
      Compaction min/max thresholds: 4/32
      Read repair chance: 0.1
      DC Local Read repair chance: 0.0
      Populate IO Cache on flush: false
      Replicate on write: true
      Caching: KEYS_ONLY
      Bloom Filter FP chance: default
      Built indexes: [Users.Users_age_idx, Users.Users_occupation_idx]
      Column Metadata:
        Column Name: occupation
          Validation Class: org.apache.cassandra.db.marshal.UTF8Type
          Index Name: Users_occupation_idx
          Index Type: KEYS
        Column Name: age
          Validation Class: org.apache.cassandra.db.marshal.UTF8Type
          Index Name: Users_age_idx
          Index Type: KEYS
      Compaction Strategy: org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy
      Compression Options:
        sstable_compression: org.apache.cassandra.io.compress.SnappyCompressor

ageとoccupationカラムに、セカンダリインデックスが貼られたことになっています。

では、Thrift APIの方へ。

まずは、SlicePredicateを作成します。

    def indexPredicate = new SlicePredicate()
    indexPredicate.slice_range = new SliceRange(ByteBuffer.wrap(new byte[0]),  // start
                                                ByteBuffer.wrap(new byte[0]),  // finish
                                                false,  // reverse
                                                100)  // count

続いて、IndexClauseを作成。

    def indexClause = new IndexClause([new IndexExpression('occupation'.asBuffer(),
                                                           IndexOperator.EQ,
                                                           'System Engineer'.asBuffer()),
                                          new IndexExpression('age'.asBuffer(),
                                                              IndexOperator.GTE,
                                                              '20'.asBuffer())],
                                      ByteBuffer.wrap(new byte[0]),  // start key
                                      100)  // count

startKeyというのは、おそらくロウキーの開始を指定しているのだと思いますが…。
countは、取得するロウの数のようです。

IndexExpressionのListは、検索する条件を表しています。

IndexExpressionのコンストラクタ引数は、それぞれ

となっています。

IndexOperatorは、EQ、GT、GTE、LT、LTEがあります。ただし、条件の中に少なくともひとつはEQの演算子を含める必要があります。

このクエリは、以下のcliのコマンドと同じ意味です。

[default@Room] get Users where occupation = 'System Engineer' and age >= 20;

ちなみに、クエリの中に少なくとも

あとは、ColumnParent、作成したIndexClause、SlicePredicate、ConsistencyLevelを指定して、get_indexed_slicesメソッドを呼び出します。

    def getIndexedKeySlices =
        client.get_indexed_slices(columnParent,
                                  indexClause,
                                  indexPredicate,
                                  ConsistencyLevel.ALL)

戻り値はKeySliceのListなので、これまでの例と同じように扱えばOKです。

    getIndexedKeySlices.each { keySlice ->
        def key = new String(keySlice.key, StandardCharsets.UTF_8)
        println("key => $key")

        keySlice.columns.each { columnOrSuperColumn ->
            def column = columnOrSuperColumn.column
            println('get indexed slice column: [name: value] => [' +
                        new String(column.name, StandardCharsets.UTF_8) + ': ' +
                        new String(column.value, StandardCharsets.UTF_8) + ']')
        }
    }

実行結果。

key => 1
get indexed slice column: [name: value] => [age: 20]
get indexed slice column: [name: value] => [name: Suzuki Taro]
get indexed slice column: [name: value] => [occupation: System Engineer]

セカンダリインデックスは、いい収穫でした。

一応、今回書いたコード全部です。

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

@Grab('org.apache.cassandra:cassandra-all:1.2.4')
import org.apache.cassandra.thrift.Cassandra
import org.apache.cassandra.thrift.Column
import org.apache.cassandra.thrift.ColumnOrSuperColumn
import org.apache.cassandra.thrift.ColumnParent
import org.apache.cassandra.thrift.ColumnPath
import org.apache.cassandra.thrift.ConsistencyLevel
import org.apache.cassandra.thrift.Deletion
import org.apache.cassandra.thrift.IndexClause
import org.apache.cassandra.thrift.IndexExpression
import org.apache.cassandra.thrift.IndexOperator
import org.apache.cassandra.thrift.KeyRange
import org.apache.cassandra.thrift.KeySlice
import org.apache.cassandra.thrift.Mutation
import org.apache.cassandra.thrift.SlicePredicate
import org.apache.cassandra.thrift.SliceRange

import org.apache.thrift.transport.TTransport
import org.apache.thrift.transport.TFramedTransport
import org.apache.thrift.transport.TSocket
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.protocol.TBinaryProtocol

String.metaClass {
    asBinary << { getBytes(StandardCharsets.UTF_8) }
    asBuffer << { ByteBuffer.wrap(asBinary()) }
}

Cassandra.Client.metaClass {
    'static' {
        openWith << { conn, cls ->
            def transport = new TFramedTransport(new TSocket(conn['host'], conn['port']))
            def protocol = new TBinaryProtocol(transport)
            def client = new Cassandra.Client(protocol)

            transport.open()

            try {
                client.set_keyspace(conn['keyspace'])
                cls(client)
            } finally {
                transport.close()
            }
        }
    }
}

Column.metaClass.static.create << { name, value, timestamp ->
    def column = new Column(name.toString().asBuffer())
    column.setValue(value.toString().asBuffer())
    column.timestamp = timestamp
    column
}

Mutation.metaClass {
    'static' {
        columnsMap << { key, columnFamilyName, columns ->
            def mutations = []
            for (column in columns) {
                def colOrSuper = new ColumnOrSuperColumn().setColumn(column)
                def mutation = new Mutation()
                mutation.column_or_supercolumn = colOrSuper
                mutations << mutation
            }
            def map = [: ]
            def mutationMap = [: ]
            mutationMap[columnFamilyName] = mutations
            map[key.asBuffer()] = mutationMap
            map
        }
    }
}

Cassandra.Client.openWith([host: 'localhost',
                           port: 9160,
                           keyspace: 'Room']) { client ->
    def columnFamilyName = 'Users'
    def columnParent = new ColumnParent(columnFamilyName)
    def timestamp = System.currentTimeMillis() * 1000

    // データ登録
    client.batch_mutate(Mutation.columnsMap('1',
                                            columnFamilyName,
                                            [Column.create('name', 'Suzuki Taro', timestamp),
                                             Column.create('age', 20, timestamp),
                                             Column.create('occupation', 'System Engineer', timestamp)]),
                        ConsistencyLevel.ALL)

    client.batch_mutate(Mutation.columnsMap('2',
                                            columnFamilyName,
                                            [Column.create('name', 'Tanaka Jiro', timestamp),
                                             Column.create('age', 22, timestamp),
                                             Column.create('occupation', 'Programmer', timestamp)]),
                        ConsistencyLevel.ALL)

    client.batch_mutate(Mutation.columnsMap('3',
                                            columnFamilyName,
                                            [Column.create('name', 'Nakata Saburo', timestamp),
                                             Column.create('age', 27, timestamp),
                                             Column.create('occupation', 'Sales Engineer', timestamp)]),
                        ConsistencyLevel.ALL)

    // multiget_sliceを使って、複数ロウを取得
    def multiGetSlicePredicate = new SlicePredicate()
    multiGetSlicePredicate.slice_range = new SliceRange(ByteBuffer.wrap(new byte[0]),
                                                        ByteBuffer.wrap(new byte[0]),
                                                        false,
                                                        100)

    def multiGetSliceMap =
        client.multiget_slice(['1'.asBuffer(), '2'.asBuffer(), '3'.asBuffer()],
                              columnParent,
                              multiGetSlicePredicate,
                              ConsistencyLevel.ONE)

    multiGetSliceMap.each { buffer, columnOrSuperColumns ->
        def start = buffer.position()
        def limit = buffer.limit()
        def bytes = new byte[limit - start]
        (start..<limit).eachWithIndex { c, i -> bytes[i] = buffer.get(c) }

        def key = new String(bytes, StandardCharsets.UTF_8)
        println("key => $key")

        columnOrSuperColumns.each { columnOrSuperColumn ->
            def column = columnOrSuperColumn.column
            println('get multi column: [name: value] => [' +
                        new String(column.name, StandardCharsets.UTF_8) + ': ' +
                        new String(column.value, StandardCharsets.UTF_8) + ']')
        }
    }

    // multiget_countを使って、複数ロウのカラム数を取得
    def multiGetCountPredicate = new SlicePredicate()
    multiGetCountPredicate.slice_range = new SliceRange(ByteBuffer.wrap(new byte[0]),
                                                        ByteBuffer.wrap(new byte[0]),
                                                        false,
                                                        100)

    def multiGetCountMap =
        client.multiget_count(['1'.asBuffer(), '2'.asBuffer(), '3'.asBuffer()],
                              columnParent,
                              multiGetCountPredicate,
                              ConsistencyLevel.ONE)

    multiGetCountMap.each { buffer, count ->
        def start = buffer.position()
        def limit = buffer.limit()
        def bytes = new byte[limit - start]
        (start..<limit).eachWithIndex { c, i -> bytes[i] = buffer.get(c) }

        def key = new String(bytes, StandardCharsets.UTF_8)
        println("key: count => $key: $count")
    }

    // get_range_slicesで、ロウキーの範囲で取得
    def rangeSlicesPredicate = new SlicePredicate()
    rangeSlicesPredicate.slice_range = new SliceRange(ByteBuffer.wrap(new byte[0]),
                                                      ByteBuffer.wrap(new byte[0]),
                                                      false,
                                                      100)

    def rangeSliceKeyRange = new KeyRange()
    // 全キー指定
    rangeSliceKeyRange.start_key = new byte[0]
    rangeSliceKeyRange.end_key = new byte[0]
    // とはいえ、RandomPartitionerではキーの範囲指定はできませんが…
    // InvalidRequestException(why:start key's token sorts after end key's token.  this is not allowed; you probably should not specify end key at all except with an ordered partitioner)
    //rangeSliceKeyRange.start_key = '1'.asBinary()
    //rangeSliceKeyRange.end_key = '2'.asBinary()

    def getRangeKeySlices =
        client.get_range_slices(columnParent,
                                rangeSlicesPredicate,
                                rangeSliceKeyRange,
                                ConsistencyLevel.ALL)

    getRangeKeySlices.each { keySlice ->
        def key = new String(keySlice.key, StandardCharsets.UTF_8)
        println("key => $key")

        keySlice.columns.each { columnOrSuperColumn ->
            def column = columnOrSuperColumn.column
            println('get range slices column: [name: value] => [' +
                        new String(column.name, StandardCharsets.UTF_8) + ': ' +
                        new String(column.value, StandardCharsets.UTF_8) + ']')
        }
    }

    // get_paged_sliceを使うと、get_range_slicesを少し簡単に呼べる
    def pagedKeyRange = new KeyRange()
    pagedKeyRange.start_key = new byte[0]
    pagedKeyRange.end_key = new byte[0]

    def getPagedKeySlices =
        client.get_paged_slice(columnFamilyName,
                               pagedKeyRange,
                               ByteBuffer.wrap(new byte[0]),  // start_columnを指定
                               ConsistencyLevel.ALL)
    // start_columnの指定は、最初の取得結果1件目にしか効果がない??

    getPagedKeySlices.each { keySlice ->
        def key = new String(keySlice.key, StandardCharsets.UTF_8)
        println("key => $key")

        keySlice.columns.each { columnOrSuperColumn ->
            def column = columnOrSuperColumn.column
            println('get paged slice column: [name: value] => [' +
                        new String(column.name, StandardCharsets.UTF_8) + ': ' +
                        new String(column.value, StandardCharsets.UTF_8) + ']')
        }
    }

    // get_indexed_slicesを使った、セカンダリインデックスを使用する検索
    def indexPredicate = new SlicePredicate()
    indexPredicate.slice_range = new SliceRange(ByteBuffer.wrap(new byte[0]),  // start
                                                ByteBuffer.wrap(new byte[0]),  // finish
                                                false,  // reverse
                                                100)  // count

    def indexClause = new IndexClause([new IndexExpression('occupation'.asBuffer(),
                                                           IndexOperator.EQ,
                                                           'System Engineer'.asBuffer()),
                                          new IndexExpression('age'.asBuffer(),
                                                              IndexOperator.GTE,
                                                              '20'.asBuffer())],
                                      ByteBuffer.wrap(new byte[0]),  // start key
                                      100)  // count

    def getIndexedKeySlices =
        client.get_indexed_slices(columnParent,
                                  indexClause,
                                  indexPredicate,
                                  ConsistencyLevel.ALL)

    getIndexedKeySlices.each { keySlice ->
        def key = new String(keySlice.key, StandardCharsets.UTF_8)
        println("key => $key")

        keySlice.columns.each { columnOrSuperColumn ->
            def column = columnOrSuperColumn.column
            println('get indexed slice column: [name: value] => [' +
                        new String(column.name, StandardCharsets.UTF_8) + ': ' +
                        new String(column.value, StandardCharsets.UTF_8) + ']')
        }
    }
}