CLOVER🍀

That was when it all began.

JavaクライアントからCQLを使ってみる

Cassandraを勉強しているのはいいのですが、世の中でCassandraを使っている方々のデータアクセス方法が、CQLと普通のアクセス方法のどちらがメジャーなのか、よくわからない今日この頃です。

とりあえず、両方とも進めてみるわけですが(笑)。

というわけで、JavaクライアントからCQLを実行する簡単なプログラムを書いてみました。

いわゆる「Hello World」レベルのもののはずなのに、めっちゃ苦労しましたけど…。

なお、今回はCQL3を対象とします。

まず、cqlshでキースペースとテーブルを作成します。

cqlsh> CREATE KEYSPACE cqldemo
   ... WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
cqlsh> USE cqldemo;

cqlsh:cqldemo> CREATE TABLE books (
           ... isbn13 varchar,
           ... title varchar,
           ... price int,
           ... publish_date timestamp,
           ... PRIMARY KEY(isbn13)
           ... );

データも登録しておきます。

cqlsh:cqldemo> INSERT INTO books (isbn13, title, price, publish_date)
           ... VALUES ('978-4873115290', 'Cassandra', 3570, '2011-12-24');

cqlsh:cqldemo> INSERT INTO books (isbn13, title, price, publish_date)
           ... VALUES('978-4798128436', 'Cassandra実用システムインテグレーション', 3360, '2013-01-16');

cqlsh:cqldemo> SELECT * FROM books;

 isbn13         | price | publish_date             | title
----------------+-------+--------------------------+-----------------------------------------
 978-4798128436 |  3360 | 2013-01-16 00:00:00+0900 | Cassandra実用システムインテグレーション
 978-4873115290 |  3570 | 2011-12-24 00:00:00+0900 |                               Cassandra

cqlsh:cqldemo> SELECT COUNT(*) FROM books;

 count
-------
     2

はい。

Javaからといっても、今回も変わらずGroovyからやりますが。

今回使ったimport文。

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

@Grab('org.apache.cassandra:cassandra-all:1.2.4')
import org.apache.cassandra.thrift.Cassandra
import org.apache.cassandra.thrift.Column
import org.apache.cassandra.thrift.Compression
import org.apache.cassandra.thrift.ConsistencyLevel
import org.apache.cassandra.thrift.CqlResult
import org.apache.cassandra.thrift.CqlRow

import org.apache.cassandra.db.marshal.DateType
import org.apache.cassandra.db.marshal.Int32Type
import org.apache.cassandra.db.marshal.LongType
import org.apache.cassandra.db.marshal.UTF8Type

import org.apache.thrift.transport.TTransport
import org.apache.thrift.transport.TFramedTransport
import org.apache.thrift.transport.TSocket
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.protocol.TBinaryProtocol

コード中で、頻繁にエンコーディングを指定することになるので、UTF-8の変数を宣言しておきました。

def charset = StandardCharsets.UTF_8

で、まずはお決まりのコード。キースペースを指定するところまで、とりあえず書きます。

def transport = new TFramedTransport(new TSocket('localhost', 9160))
def protocol = new TBinaryProtocol(transport)
def client = new Cassandra.Client(protocol)

transport.open()
client.set_keyspace('cqldemo')

try {
    // ここにプログラムを書く!
} finally {
    transport.close()
}

あとは、CQLを実行するだけですね。

今回実行したCQLは、

SELECT * FROM books;
SELECT COUNT(*) FROM books;

の2本です。

もっといろいろやろうと思ったのですが、これでけっこうハマってしまってですね…。

とりあえず、CQLの実行にはCassandra.Client#execute_cql3_queryメソッドを使うみたいです。

    CqlResult cqlResult = 
        client.execute_cql3_query(ByteBuffer.wrap('SELECT * FROM books'.getBytes(charset)),
                                  Compression.NONE,
                                  ConsistencyLevel.ONE)

結果は、CqlResultになります。引数は、ひとつ目にCQLをByteBufferで渡します。続いてCompression、そしてConsistencyLevelです。

ComporessionはNONEとGZIPが指定できますが、GZIPはそれを指定しただけだとコケちゃったので、他に何か必要そうな感じでした。とりあえず、今回は割愛。

CqlResultからはメタデータなども取得することができますが、最終的に欲しいのはロウとカラム、ということになります。これは、

CqlResult#getRows(): List => CqlRow#getColumns(): List => Column

といった感じで、CqlResultから遡って取得することになります。

つまり、こういう感じになるでしょうか。

    CqlResult cqlResult = 
        client.execute_cql3_query(ByteBuffer.wrap('SELECT * FROM books'.getBytes(charset)),
                                  Compression.NONE,
                                  ConsistencyLevel.ONE)

    for (CqlRow row : cqlResult.getRows()) {
        for (Column column : row.getColumns()) {
            def name = new String(column.getName(), charset)
            def value = ???
        }
    }

valueの部分を書いていませんが…これ、最初はそのままStringに使用として失敗したんですよね。

自分でテーブルを書いておいてなんですが、テーブルのカラムの型にintとtimestampが入っているので、そのままStringにしてしまうと結果がおかしくなります。

cqlsh:cqldemo> CREATE TABLE books (
           ... isbn13 varchar,
           ... title varchar,
           ... price int,
           ... publish_date timestamp,
           ... PRIMARY KEY(isbn13)
           ... );

なんですけど、Columnクラスにその辺をデコードしてくれるメソッドは存在しません。しかも、Column#getValueメソッドはbyte配列が戻ってくるだけですからね…。

え…自分でデコードしろってこと?マジ?

と思い、最初はNIOでカラム名ごとにデコードする仕掛けを作ってみました。

    def factories =
        [isbn13: { binary -> charset.decode(ByteBuffer.wrap(binary)).toString() },
         title: { binary -> charset.decode(ByteBuffer.wrap(binary)).toString() },
         price: { binary -> ByteBuffer.wrap(binary).getInt() },
         publish_date: { binary -> new Date(ByteBuffer.wrap(binary).getLong()).format('yyyy/MM/dd') }]

これを

            def name = new String(column.getName(), charset)
            def value = factories[name](column.getValue())

と適用する、みたいな。

が…やっぱりおかしい。デコードするクラスは絶対あるだろうと。

ここで、CqlResult#getSchemaの内容に注目。

    println('Schema => ' + cqlResult.getSchema())

これを出してみると…

Schema => CqlMetadata(name_types:{java.nio.HeapByteBuffer[pos=415 lim=420 cap=738]=UTF8Type, java.nio.HeapByteBuffer[pos=457 lim=463 cap=738]=UTF8Type, java.nio.HeapByteBuffer[pos=436 lim=441 cap=738]=UTF8Type, java.nio.HeapByteBuffer[pos=387 lim=399 cap=738]=UTF8Type}, value_types:{java.nio.HeapByteBuffer[pos=548 lim=553 cap=738]=org.apache.cassandra.db.marshal.Int32Type, java.nio.HeapByteBuffer[pos=655 lim=661 cap=738]=org.apache.cassandra.db.marshal.UTF8Type, java.nio.HeapByteBuffer[pos=602 lim=607 cap=738]=org.apache.cassandra.db.marshal.UTF8Type, java.nio.HeapByteBuffer[pos=488 lim=500 cap=738]=org.apache.cassandra.db.marshal.DateType}, default_name_type:UTF8Type, default_value_type:UTF8Type)

となりました。

お!型が分かってそうですね。ならこれを使って…と言いたいところですが、CqlMetadata#getValue_typesのシグニチャは

public java.util.Map<java.nio.ByteBuffer,java.lang.String> getValue_types()

Mapの値はStringなのか…。

で、各Typeはinstanceというstaticフィールドを持っているようなので、ここから名前でClassを引っ張ってきてデコードするように…

    def valueTypeMap = cqlResult.getSchema().getValue_types()
    def marshaller = { name, binary ->
        def typeClass = Cassandra.class.getClassLoader().loadClass(name)
        def field = typeClass.getField('instance')
        field.get(null).compose(ByteBuffer.wrap(binary))
    }

とか書いていて、最終的には普通に名前でマッチをかける方向に考えました。

    def namedMarshaller =
        [isbn13: { binary -> UTF8Type.instance.compose(ByteBuffer.wrap(binary)) },
         title: { binary -> UTF8Type.instance.compose(ByteBuffer.wrap(binary)) },
         price: { binary -> Int32Type.instance.compose(ByteBuffer.wrap(binary)) },
         publish_date: { binary -> DateType.instance.compose(ByteBuffer.wrap(binary)).format('yyyy/MM/dd') }]

最終的にできあがったコードは、こちらです。

transport.open()
client.set_keyspace('cqldemo')

try {
    println('Execute [SELECT * FROM books]')

    CqlResult cqlResult = 
        client.execute_cql3_query(ByteBuffer.wrap('SELECT * FROM books'.getBytes(charset)),
                                  Compression.NONE,
                                  ConsistencyLevel.ONE)

    println('Type => ' + cqlResult.getType())
    println('Rows Size => ' + cqlResult.getRowsSize())
    println('Schema => ' + cqlResult.getSchema())

    def valueTypeMap = cqlResult.getSchema().getValue_types()
    def marshaller = { name, binary ->
        def typeClass = Cassandra.class.getClassLoader().loadClass(name)
        def field = typeClass.getField('instance')
        field.get(null).compose(ByteBuffer.wrap(binary))
    }

    def namedMarshaller =
        [isbn13: { binary -> UTF8Type.instance.compose(ByteBuffer.wrap(binary)) },
         title: { binary -> UTF8Type.instance.compose(ByteBuffer.wrap(binary)) },
         price: { binary -> Int32Type.instance.compose(ByteBuffer.wrap(binary)) },
         publish_date: { binary -> DateType.instance.compose(ByteBuffer.wrap(binary)).format('yyyy/MM/dd') }]

    /*
    def factories =
        [isbn13: { binary -> charset.decode(ByteBuffer.wrap(binary)).toString() },
         title: { binary -> charset.decode(ByteBuffer.wrap(binary)).toString() },
         price: { binary -> ByteBuffer.wrap(binary).getInt() },
         publish_date: { binary -> new Date(ByteBuffer.wrap(binary).getLong()).format('yyyy/MM/dd') }]
    */

    for (CqlRow row : cqlResult.getRows()) {
        println("Key => " + new String(row.getKey(), charset))

        for (Column column : row.getColumns()) {
            def name = new String(column.getName(), charset)
            println("[name: value] => [" +
                        name +
                        ': ' +
                        namedMarshaller[name](column.getValue()) +
                        // marshaller(valueTypeMap[ByteBuffer.wrap(column.getName())], column.getValue()) +
                        // factories[name](column.getValue()) +
                        ']')
        }
    }

    println('--------------------------------------------------------')

    println('Execute [SELECT COUNT(*) FROM books]')

    cqlResult = client.execute_cql3_query(ByteBuffer.wrap('SELECT COUNT(*) FROM books'.getBytes(charset)),
                                          Compression.NONE,
                                          ConsistencyLevel.ONE)

    println("Type => " + cqlResult.getType())
    println('Rows Size => ' + cqlResult.getRowsSize())
    println('Schema => ' + cqlResult.getSchema())

    factories = [count: { binary -> ByteBuffer.wrap(binary).getInt() }]

    for (CqlRow row : cqlResult.getRows()) {
        println("Key => " + new String(row.getKey(), charset))

        for (Column column : row.getColumns()) {
            def name = new String(column.getName(), charset)
            println("[name: value] => [" +
                        name +
                        ': ' +
                        LongType.instance.compose(ByteBuffer.wrap(column.getValue())) +
                        ']')
        }
    }
} finally {
    transport.close()
}

試行錯誤の履歴も、ちょっと残しています。どれも一応動作しますが。

これを動かすと、こういう結果が得られます。

Execute [SELECT * FROM books]
Type => ROWS
Rows Size => 2
Schema => CqlMetadata(name_types:{java.nio.HeapByteBuffer[pos=415 lim=420 cap=738]=UTF8Type, java.nio.HeapByteBuffer[pos=457 lim=463 cap=738]=UTF8Type, java.nio.HeapByteBuffer[pos=436 lim=441 cap=738]=UTF8Type, java.nio.HeapByteBuffer[pos=387 lim=399 cap=738]=UTF8Type}, value_types:{java.nio.HeapByteBuffer[pos=548 lim=553 cap=738]=org.apache.cassandra.db.marshal.Int32Type, java.nio.HeapByteBuffer[pos=655 lim=661 cap=738]=org.apache.cassandra.db.marshal.UTF8Type, java.nio.HeapByteBuffer[pos=602 lim=607 cap=738]=org.apache.cassandra.db.marshal.UTF8Type, java.nio.HeapByteBuffer[pos=488 lim=500 cap=738]=org.apache.cassandra.db.marshal.DateType}, default_name_type:UTF8Type, default_value_type:UTF8Type)
Key => 
[name: value] => [isbn13: 978-4798128436]
[name: value] => [price: 3360]
[name: value] => [publish_date: 2013/01/16]
[name: value] => [title: Cassandra実用システムインテグレーション]
Key => 
[name: value] => [isbn13: 978-4873115290]
[name: value] => [price: 3570]
[name: value] => [publish_date: 2011/12/24]
[name: value] => [title: Cassandra]
--------------------------------------------------------
Execute [SELECT COUNT(*) FROM books]
Type => ROWS
Rows Size => 1
Schema => CqlMetadata(name_types:{java.nio.HeapByteBuffer[pos=108 lim=113 cap=220]=UTF8Type}, value_types:{java.nio.HeapByteBuffer[pos=138 lim=143 cap=220]=org.apache.cassandra.db.marshal.LongType}, default_name_type:UTF8Type, default_value_type:UTF8Type)
Key => 
[name: value] => [count: 2]

情報がホントに少なくて、けっこう苦労しました。しかも、今のアプローチが正しいかどうかわからない…(笑)。

まあ、もうちょっと調べてたらわかるかなぁ…。

その他、更新系を試してみたいとか、PreparedStatementみたいな機能を使ってみたいとかあるのですが、時間を使い切っちゃったので、ここまでということで。

ちなみに、CQLで登録したデータをcassandra-cliで見ると、こんな感じに表示されました。

[default@cqldemo] list books;
Using default limit of 100
Using default column limit of 100
-------------------
RowKey: 978-4798128436
=> (column=, value=, timestamp=1367235978728000)
=> (column=price, value=00000d20, timestamp=1367235978728000)
=> (column=publish_date, value=0000013c3eb95980, timestamp=1367235978728000)
=> (column=title, value=43617373616e647261e5ae9fe794a8e382b7e382b9e38386e383a0e382a4e383b3e38386e382b0e383ace383bce382b7e383a7e383b3, timestamp=1367235978728000)
-------------------
RowKey: 978-4873115290
=> (column=, value=, timestamp=1367235914407000)
=> (column=price, value=00000df2, timestamp=1367235914407000)
=> (column=publish_date, value=000001346b6f8d80, timestamp=1367235914407000)
=> (column=title, value=43617373616e647261, timestamp=1367235914407000)

2 Rows Returned.
Elapsed time: 40 msec(s).

PRIMARY KEYがロウキーになってて、名前と値がないカラムがひとつあります…。PRIMARY KEYの分が空いてるんだろうなぁ…。