CLOVER🍀

That was when it all began.

もっとCassandraのJavaクライアントプログラミング

前回は、CassandraのThriftサンプルを意味はあまり考えずにGroovyに書き直したものでしたが、今回はちょっと意味の理解も踏まえて書いてみることにしました。

まずは、キースペースとカラムファミリの用意。

[default@unknown] create keyspace Room;
7b6e7ec9-d6ef-3d5e-9e54-53784a03e59b
[default@unknown] use Room;
Authenticated to keyspace: Room
[default@Room] create column family Users
...	with key_validation_class = 'UTF8Type'
...	and comparator = 'UTF8Type'
...	and default_validation_class = 'UTF8Type';
98bb2038-8b70-3a06-b77f-2d148941fee2

はい。

続いて、Groovy側ですが今度は冗長に書いていくのは嫌だったので、少しExpandMetaClassを使ったショートカットを追加しました。

Stringからbyte配列やByteBufferの変更が多かったり

String.metaClass {
    asBinary << { getBytes(StandardCharsets.UTF_8) }
    asBuffer << { ByteBuffer.wrap(asBinary()) }
}

Cassandraへの接続を隠しておいたり

Cassandra.Client.metaClass {
    'static' {
        openWith << { conn, cls ->
            def transport = new TFramedTransport(new TSocket(conn['host'], conn['port']))
            def protocol = new TBinaryProtocol(transport)
            def client = new Cassandra.Client(protocol)

            transport.open()

            try {
                client.set_keyspace(conn['keyspace'])
                cls(client)
            } finally {
                transport.close()
            }
        }
    }
}

Columnのインスタンス生成って、けっこう面倒ですよねーとか

Column.metaClass.static.create << { name, value, timestamp ->
    def column = new Column(name.toString().asBuffer())
    column.setValue(value.toString().asBuffer())
    column.timestamp = timestamp
    column
}

Mutationの作成なんて、素で書くと辛そうな感じだったので…。

Mutation.metaClass {
    'static' {
        columnsMap << { key, columnFamilyName, columns ->
            def mutations = []
            for (column in columns) {
                def colOrSuper = new ColumnOrSuperColumn().setColumn(column)
                def mutation = new Mutation()
                mutation.column_or_supercolumn = colOrSuper
                mutations << mutation
            }
            def map = [: ]
            def mutationMap = [: ]
            mutationMap[columnFamilyName] = mutations
            map[key.asBuffer()] = mutationMap
            map
        }

        deleteColumnsMap << { key, columnFamilyName, columnNames, timestamp ->
            def predicate = new SlicePredicate().setColumn_names(columnNames.collect { it.asBuffer() })
            def deletion = new Deletion()
            deletion.predicate = predicate
            deletion.timestamp = timestamp

            def mutation = new Mutation()
            mutation.deletion = deletion

            def map = [: ]
            def mutationMap = [: ]
            mutationMap[columnFamilyName] = [mutation]
            map[key.asBuffer()] = mutationMap
            map
        }
    }
}

ここから書くサンプルには、以下のimport文が入っているものとします。

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

@Grab('org.apache.cassandra:cassandra-all:1.2.4')
import org.apache.cassandra.thrift.Cassandra
import org.apache.cassandra.thrift.Column
import org.apache.cassandra.thrift.ColumnOrSuperColumn
import org.apache.cassandra.thrift.ColumnParent
import org.apache.cassandra.thrift.ColumnPath
import org.apache.cassandra.thrift.ConsistencyLevel
import org.apache.cassandra.thrift.Deletion
import org.apache.cassandra.thrift.KeyRange
import org.apache.cassandra.thrift.KeySlice
import org.apache.cassandra.thrift.Mutation
import org.apache.cassandra.thrift.SlicePredicate
import org.apache.cassandra.thrift.SliceRange

import org.apache.thrift.transport.TTransport
import org.apache.thrift.transport.TFramedTransport
import org.apache.thrift.transport.TSocket
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.protocol.TBinaryProtocol

で、先ほどCassandra.Clientに追加した、openWithメソッドに渡すClosureとして処理を書いていきます。

Cassandra.Client.openWith([host: 'localhost',
                           port: 9160,
                           keyspace: 'Room']) { client ->
    // ここに、Cassandra.Clientを使った処理を書く
}

Closureの引数としては、Cassandra.Clientのインスタンスが渡ってくるようになっています。

カラムファミリは「Users」で、対応するColumnParent、そして登録処理で使うタイムスタンプはあらかじめ作っておきます。

    def columnFamilyName = 'Users'
    def columnParent = new ColumnParent(columnFamilyName)

    def timestamp = System.currentTimeMillis() * 1000

ちなみに、タイムスタンプはナノ秒で保存しているそうなので、一応1000倍しています。

以下のColumnの項目を参照
http://wiki.apache.org/cassandra/DataModel

Cassandra.Client#insert

カラムをひとつずつ登録するメソッドです。

    // カラムをひとつずつ登録
    client.insert('1'.asBuffer(),
                  columnParent,
                  Column.create('name', 'Suzuki Taro', timestamp),
                  ConsistencyLevel.ALL)
    client.insert('1'.asBuffer(),
                  columnParent,
                  Column.create('age', 20, timestamp),
                  ConsistencyLevel.ALL)
    client.insert('1'.asBuffer(),
                  columnParent,
                  Column.create('occupation', 'System Engineer', timestamp),
                  ConsistencyLevel.ALL)

引数には、それぞれロウキー、ColumnParent、Column、ConsistencyLevelを設定します。ConsistencyLevelは、今回のサンプルでは全てもっとも整合性の高いALLを使っています。

StringにasBufferメソッドを追加しているので、

'1'.asBuffer()

みたいにちょっとだけ端折れています。また、Columnのインスタンスを作成するのは、ホントなら以下のように書くので、それだけで行数取るんですよね…。

Column.metaClass.static.create << { name, value, timestamp ->
    def column = new Column(name.toString().asBuffer())
    column.setValue(value.toString().asBuffer())
    column.timestamp = timestamp
    column
}

はい、ここでもByteBuffer使いまくりです。

ちなみに、今回のサンプルで追加している3つのカラムはまったく関係性がないので、RDBMSトランザクションとは違って、2カラムはうまく追加できたけど、3カラム目で失敗したりすると、ふつうに2カラムは追加されたまま残ります。

Cassandraって、そういうものですよね。

これを実行した後にcassandra-cliで見ると、こういう結果になります。

[default@Room] list Users;
Using default limit of 100
Using default column limit of 100
-------------------
RowKey: 1
=> (column=age, value=20, timestamp=1367135687162000)
=> (column=name, value=Suzuki Taro, timestamp=1367135687162000)
=> (column=occupation, value=System Engineer, timestamp=1367135687162000)

1 Row Returned.
Elapsed time: 100 msec(s).

Cassandra.Client#batch_mutate

insertと違って、複数カラムを1回のオペレーションで登録するメソッドです。

が、これが面倒で…。

とりあえず、サンプルとしてはこんな感じで。

    // 複数カラムの一括登録
    client.batch_mutate(Mutation.columnsMap('2',
                                            columnFamilyName,
                                            [Column.create('name', 'Tanaka Jiro', timestamp),
                                             Column.create('age', 22, timestamp),
                                             Column.create('occupation', 'Programmer', timestamp)]),
                        ConsistencyLevel.ALL)

登録するColumnを生成してListにしている分はいいのですが、

                                            [Column.create('name', 'Tanaka Jiro', timestamp),
                                             Column.create('age', 22, timestamp),
                                             Column.create('occupation', 'Programmer', timestamp)]),

batch_mutateメソッドの引数は、こんなシグネチャなので

public void batch_mutate(java.util.Map<java.nio.ByteBuffer,java.util.Map<java.lang.String,java.util.List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)

カラムファミリの名前をキーにして、MutationのListにしたMapを、さらにロウキーのByteBufferをキーにしたMapとして作成する必要があります。

なので、Mutationにこういうstaticメソッドを追加しました。

Mutation.metaClass {
    'static' {
        columnsMap << { key, columnFamilyName, columns ->
            def mutations = []
            for (column in columns) {
                def colOrSuper = new ColumnOrSuperColumn().setColumn(column)
                def mutation = new Mutation()
                mutation.column_or_supercolumn = colOrSuper
                mutations << mutation
            }
            def map = [: ]
            def mutationMap = [: ]
            mutationMap[columnFamilyName] = mutations
            map[key.asBuffer()] = mutationMap
            map
        }

        deleteColumnsMap << { key, columnFamilyName, columnNames, timestamp ->
            // 省略
        }
    }
}

普通に書いていたら、これだけでコードが埋まります…。実行結果は、insertのものとそんなに変わらないので割愛。

Cassandra.Client#get

単一のカラムを取得するためのメソッドです。

使い方の例は、こんな感じ。

    // 単一カラムを取得
    def targetColumnPath = new ColumnPath(columnFamilyName)
    targetColumnPath.setColumn('name'.asBuffer())
    def colOrSupCol = client.get('1'.asBuffer(),
                               targetColumnPath,
                               ConsistencyLevel.ALL)
    colOrSupCol.column.with {
        println('get one coloumn: [name: value] => [' +
                    new String(name, StandardCharsets.UTF_8) + ': ' +
                    new String(value, StandardCharsets.UTF_8) + ']')
    }

呼び出しの際には、ロウキーのByteBuffer、ColumnPath、ConsistencyLevelが必要です。

    def colOrSupCol = client.get('1'.asBuffer(),
                               targetColumnPath,
                               ConsistencyLevel.ALL)

ColumnPathは、カラムファミリ名とカラム名のByteBufferを設定します。

    def targetColumnPath = new ColumnPath(columnFamilyName)
    targetColumnPath.setColumn('name'.asBuffer())

Cassandra.Client#getの戻り値はColumnOrSuperColumクラスのインスタンスになっているので、今回はColumnOrSuperColunn#getColumnで中身のColumnを取り出しています。
*今回は、スーパーカラムは試していません

    colOrSupCol.column.with {
        println('get one coloumn: [name: value] => [' +
                    new String(name, StandardCharsets.UTF_8) + ': ' +
                    new String(value, StandardCharsets.UTF_8) + ']')

Columnからは、nameやvalueが取れますがbyte配列なので、Stringにデコードしています…。

先のinsertメソッドの例で登録したデータに対して実行すると、こんな感じになります。

get one coloumn: [name: value] => [name: Suzuki Taro]

Cassandra.Client#get_slice

複数カラムを一括で取得するためには、get_sliceメソッドを使用します。

    // 複数カラムを一括取得
    def predicate = new SlicePredicate()
    def range = new SliceRange(ByteBuffer.wrap(new byte[0]),  // Start
                               ByteBuffer.wrap(new byte[0]),  // Finish
                               false,  // Reverse
                               100) // Count
    predicate.slice_range = range
    def colomnOrSuperColumns = client.get_slice('1'.asBuffer(),
                                                columnParent,
                                                predicate,
                                                ConsistencyLevel.ALL)
    for (columnOrSuperColumn in colomnOrSuperColumns) {
        def column = columnOrSuperColumn.column
        println('get multi column: [name: value] => [' +
                    new String(column.name, StandardCharsets.UTF_8) + ': ' +
                    new String(column.value, StandardCharsets.UTF_8) + ']')
    }

まずは、SlicePredicateとSliceRangeのインスタンスを作成します。

    def predicate = new SlicePredicate()
    def range = new SliceRange(ByteBuffer.wrap(new byte[0]),  // Start
                               ByteBuffer.wrap(new byte[0]),  // Finish
                               false,  // Reverse
                               100) // Count
    predicate.slice_range = range

ここでSliceRangeのコンストラクタの値は、それぞれ

  • 取得するカラム(名)の範囲・開始
  • 取得するカラム(名)の範囲・終了
  • カラムのソート順(trueは逆順)
  • 取得するカラムの数

となっています。なんか、これを設定してカラムファミリに設定していたcomparatorとかの意味がわかってきましたね…。

あとは、ロウキーのByteBufferとColumnParent、そしてSliceRangeを設定したSlicePredicate、ConsistencyLevelを渡してget_sliceを呼び出します。

    def colomnOrSuperColumns = client.get_slice('1'.asBuffer(),
                                                columnParent,
                                                predicate,
                                                ConsistencyLevel.ALL)
    for (columnOrSuperColumn in colomnOrSuperColumns) {
        def column = columnOrSuperColumn.column
        println('get multi column: [name: value] => [' +
                    new String(column.name, StandardCharsets.UTF_8) + ': ' +
                    new String(column.value, StandardCharsets.UTF_8) + ']')
    }

戻り値は、ColumnOrSuperColumnのListなので、必要に応じて中身を抜き出します。

ロウキーを指定することからも明らかですが、取得できるのは「複数ロウ」ではなく、「複数カラム」です。
*複数ロウを取る場合は、multiget_sliceメソッドを使用するようですが今回は試していません

先のbatch_mutateの例で登録したデータに対して実行すると、こんな結果になります。

get multi column: [name: value] => [age: 20]
get multi column: [name: value] => [name: Suzuki Taro]
get multi column: [name: value] => [occupation: System Engineer]

ここで、ちょっとSliceRangeの設定を変えてみましょう。例えば、Reverseをtrueにすると

    def range = new SliceRange(ByteBuffer.wrap(new byte[0]),  // Start
                               ByteBuffer.wrap(new byte[0]),  // Finish
                               true,  // Reverse
                               100) // Count

取得結果が逆順になります。

get multi column: [name: value] => [occupation: System Engineer]
get multi column: [name: value] => [name: Suzuki Taro]
get multi column: [name: value] => [age: 20]

Countを1にすると

    def range = new SliceRange(ByteBuffer.wrap(new byte[0]),  // Start
                               ByteBuffer.wrap(new byte[0]),  // Finish
                               false,  // Reverse
                               1) // Count

取れるColumnOrSuperColumnがひとつになります。

get multi column: [name: value] => [age: 20]

カラム名を明示的に絞り込むと

    def range = new SliceRange('name'.asBuffer(),  // Start
                               'occupation'.asBuffer(),  // Finish
                               false,  // Reverse
                               100) // Count

取得できるカラムは、その範囲内になります。

get multi column: [name: value] => [name: Suzuki Taro]
get multi column: [name: value] => [occupation: System Engineer]

なお、最初の例でStartとFinishに指定していた

    def range = new SliceRange(ByteBuffer.wrap(new byte[0]),  // Start
                               ByteBuffer.wrap(new byte[0]),  // Finish

というのは、「全カラム」という意味になっています。nullなどの指定は不可だそうです。

Cassandra.Client#get_count

カラム数を取得するためのメソッドです。ここでも、SlicePredicateとSliceRangeを指定します。

    // カラム数の取得
    def predicateForCount = new SlicePredicate()
    def rangeForCount = new SliceRange(ByteBuffer.wrap(new byte[0]),
                                       ByteBuffer.wrap(new byte[0]),
                                       false,
                                       100)
    predicateForCount.slice_range = rangeForCount
    def count = client.get_count('1'.asBuffer(),
                                 columnParent,
                                 predicateForCount,
                                 ConsistencyLevel.ALL)
    println("column count => $count")

SlicePredicateで指定した、カラムの範囲の数が取れるってことですね。先のinsertの例で実行した結果にしての実行結果は、こんな感じです。

column count => 3

Cassandra.Client#remove

特定のカラムを削除するには、removeメソッドを使用します。

ロウキーのByteBuffer、それから削除したいカラムの名前(ByteBuffer)を設定したColumnPath、タイムスタンプとConsistencyLevelを指定して実行します。

    // 特定のカラムの削除
    client.remove('1'.asBuffer(),
                  new ColumnPath(columnFamilyName).setColumn('occupation'.asBuffer()),
                  System.currentTimeMillis() * 1000,
                  ConsistencyLevel.ALL)

この例では、「occupation」という名前のカラムが削除されます。

また、removeでロウキーに紐付くカラムを一括削除することもできます。

    client.remove('2'.asBuffer(),
                  new ColumnPath(columnFamilyName),
                  System.currentTimeMillis() * 1000,
                  ConsistencyLevel.ALL)

ColumnPathに、カラム名を設定しなければOKです。これで、全カラムが消えます。

が、なぜかロウキーは残ってしまいますが…。

Cassandra.Client#batch_mutate

複数のカラムを一括で削除するもうひとつの方法として、登録時と同じbatch_mutateメソッドを使うやり方があります。

    // 名前を指定して、複数カラムの一括削除
    // が、ロウは残る…
    client.batch_mutate(Mutation.deleteColumnsMap('2',
                                                  columnFamilyName,
                                                  ['name', 'age', 'occupation'],
                                                  System.currentTimeMillis() * 1000),
                        ConsistencyLevel.ALL)

batch_mutateメソッドは、カラムの登録時と全く同じものを使うので、組み立てるMutationの設定が変わります。

こちらが、MutationのExpandMetaClassに追加した削除用のMutationを含んだMapを作成するメソッドです。

        deleteColumnsMap << { key, columnFamilyName, columnNames, timestamp ->
            def predicate = new SlicePredicate().setColumn_names(columnNames.collect { it.asBuffer() })
            def deletion = new Deletion()
            deletion.predicate = predicate
            deletion.timestamp = timestamp

            def mutation = new Mutation()
            mutation.deletion = deletion

            def map = [: ]
            def mutationMap = [: ]
            mutationMap[columnFamilyName] = [mutation]
            map[key.asBuffer()] = mutationMap
            map
        }

削除するカラム名を指定してSlicePredicateを作成し、それを設定したDeletionを作成するところがポイントです。

            def predicate = new SlicePredicate().setColumn_names(columnNames.collect { it.asBuffer() })
            def deletion = new Deletion()
            deletion.predicate = predicate
            deletion.timestamp = timestamp

            def mutation = new Mutation()
            mutation.deletion = deletion

あとは、MutationにもDeletionを設定します。

ただ、この実装だとカラムは確かに全部消えたのですが、ロウキーは残ったままでした…。

また、SlicePredicateを使うのでSliceRangeを使ったカラム名の範囲指定ができるはずなのですが(ByteBuffer.wrap(new byte[0])を指定する)、これを実行するとエラーになりました…。

    // *batch_mutateでは、名前の範囲指定による一括削除は1.2.4でも不可
    // Caught: InvalidRequestException(why:Deletion does not yet support SliceRange predicates.)
    // InvalidRequestException(why:Deletion does not yet support SliceRange predicates.)

Cassandra.Client#truncate

指定したカラムファミリ名に属するデータを全て削除します。

client.truncate(columnFamilyName)

なんか、ちょっとずつわかってきた気がしますね。

しかし、APIが独特過ぎてとっつきにくいですね…。Thriftを使うと、こんな感じになるんでしょうか?当面は素のAPIを触ろうと思っているので、ハイレベルなクライアントは使わないつもりですが…いやー、大変。

今回のエントリを書くにあたり、こちらのサイトを参考にさせていただきました。
http://www.ne.jp/asahi/hishidama/home/tech/apache/cassandra/java.html

使っているクラスのコンストラクタや一部メソッドのシグニチャが、Cassanraのバージョンアップと共にだいぶ変わっていましたが、だいたいの内容は今でも通用します。非常に助かりました。