CLOVER🍀

That was when it all began.

DataStaxのCQL Java Driverを使ってみる

ちょっと前に、DataStaxからCQL Java Driverのリリースがブログにポストされていたので、使ってみることにしました。

The native CQL Java driver goes GA
http://www.datastax.com/dev/blog/the-native-cql-java-driver-goes-ga

Today DataStax announces the general availability of the native CQL driver for Java. This is a production-ready driver for Cassandra 1.2+ with no legacy baggage from Thrift or JDBC concepts that don’t translate well to Cassandra.

Cassandra 1.2以上で使用できて、Thrift APIや今の(Cassandra向けの?)JDBCを置き換えるものになるんでしょうか?

特徴としては、

・Out-of-the-box best practices for node discovery, load balancing and fail over
・An asynchronous architecture provides simpler concurrency without any thread pool tuning
・Tracing support
・CQL with prepared statements is about 10% faster than Thrift, and we expect that gap to widen as the native protocol matures

  • ノードディスカバリのベストプラクティス、ロードバランスとフェイルオーバー
  • スレッドプールのチューニングなしでの、単純化された並行性の非同期アーキテクチャの提供
  • Tracingのサポート
  • Thriftよりも10%早いPreparedStatementの提供

という感じらしいです。ドキュメントは、こちら。

Document
http://www.datastax.com/doc-source/developer/java-driver/index.html

Javadoc
http://www.datastax.com/drivers/java/apidocs/

クラスの中にStatementとかPreparedStatementとかResultSetとかありますが、JDBCとは別物です。

このドライバを使うためには、Cassandra 1.2で導入されたbinary protocolを使用するようです。cassandra.yamlの以下の項目をtrueに設定するようにしてください。と。

start_native_transport: true

Cassandra 1.2.5では、デフォルトでtrueになっていました。

では、使ってみましょう。

今回もGroovyで書くので、依存関係の解決にはGrapeを使います。

@Grab('com.datastax.cassandra:cassandra-driver-core:1.0.0')

ドライバはMaven Central Repositoryに登録されているので、楽ですね。

Cassandraに接続するには、Clusterクラスを使用するようです。これは、Cluster.Builderを使って組み立てるみたいです。

def cluster = Cluster.builder().addContactPoint('localhost').withPort(9042).build()

ポートは指定しなくてもいいのですが、今回は明示的に。Thrift APIとは違うポートですね。

Cassadraの起動ログでは、それぞれ以下のように出ていました。

 INFO 17:57:29,710 Starting listening for CQL clients on localhost/127.0.0.1:9042...
 INFO 17:57:29,751 Binding thrift service to localhost/127.0.0.1:9160

上がCQLクライアントのためのリッスンポート、下がThrift APIのためのリッスンポートですね。

Clusterは、最後にshutdownするようにします。

    cluster.shutdown()

接続したら、チュートリアルにならってMetadataを取得してみました。

    def metadata = cluster.getMetadata()
    println("Connected to cluster: ${metadata.clusterName}")

    for (host in metadata.allHosts) {
        println("Datacenter: $host.datacenter, Host: $host.address, Rack: $host.rack")
    }

相手にしているのがデフォルトのクラスタの場合、こんな出力が出ると思います。

Connected to cluster: Test Cluster
Datacenter: datacenter1, Host: localhost/127.0.0.1, Rack: rack1

続いて、CQLを投げてみましょう。

今回使うのも、これまでと同じキースペース、テーブルです。

$ cqlsh
Connected to Test Cluster at localhost:9160.
[cqlsh 3.0.2 | Cassandra 1.2.5 | CQL spec 3.0.0 | Thrift protocol 19.36.0]
Use HELP for help.
cqlsh> USE cqldemo;
cqlsh:cqldemo> SELECT * FROM Books;

 isbn13         | price | publish_date             | title
----------------+-------+--------------------------+-----------------------------------------
 978-4798128436 |  3360 | 2013-01-16 00:00:00+0900 | Cassandra実用システムインテグレーション
 978-4873115290 |  3570 | 2011-12-24 00:00:00+0900 |                               Cassandra

キースペースを指定して接続するには、Cluster#connect(String)を使用します。戻り値はSessionで、これを使って後でCQLを発行します。

    def session = cluster.connect('cqldemo')

キースペースを指定しなくても接続できますが、その場合は自分でUSE [keyspace]しなくてはいけないと思います。

SELECT文を投げてみましょう。

    for (row in session.execute('SELECT * FROM Books')) {
        println("isbn13: ${row.getString('isbn13')}, " +
                    "price: ${row.getInt('price')}, " + 
                    "publish_date: ${row.getDate('publish_date')}, " +
                    "title: ${row.getString('title')}")
    }

Session#executeでCQLを実行することができ、戻り値はResultSetになります。ResultSetといっても、JDBCとはまったく関係がありません。また、Iterableを実装しているので、上記のようにfor文などでイテレーションすることができます。

Rowクラスには、getStringやgetDateなど各種値を取得するメソッドが用意されています。この例ではカラム名を指定して値を取り出していますが、JDBCのようにインデックス指定で取得することもできます。

    for (row in session.execute('SELECT * FROM Books')) {
        println("isbn13: ${row.getString(0)}, " +
                    "price: ${row.getInt(1)}, " + 
                    "publish_date: ${row.getDate(2)}, " +
                    "title: ${row.getString(3)}")
    }

0オリジンですけどね…。

実行結果は、こちら。

isbn13: 978-4798128436, price: 3360, publish_date: Wed Jan 16 00:00:00 JST 2013, title: Cassandra実用システムインテグレーション
isbn13: 978-4873115290, price: 3570, publish_date: Sat Dec 24 00:00:00 JST 2011, title: Cassandra

あとは、PreparedStatementを使ってみましょう。Session#prepareでPreparedStatementのインスタンスを作成することができます。

    def preparedStatement = session.prepare('SELECT * FROM Books WHERE isbn13 = ?')

バインド対象の変数は、JDBCと同じで「?」で指定します。

が、この後が全然違いまして…PreparedStatementに値をバインドするには、BoundStatementというので包む様です。そして、BoundStatement#bindで値を設定します。

    def boundStatement = new BoundStatement(preparedStatement)

    println('----- Execute PreparedStatement -----')
    for (row in session.execute(boundStatement.bind('978-4798128436'))) {
        println("isbn13: ${row.getString('isbn13')}, " +
                    "price: ${row.getInt('price')}, " + 
                    "publish_date: ${row.getDate('publish_date')}, " +
                    "title: ${row.getString('title')}")
    }

BoundStatement#bindは可変長引数を取るので、「?」の数だけ値を渡してあげればいいみたいです。

もしくは、BoundStatement#setStringとかで値を設定してもOKです。

boundStatement.setString(0, '978-4798128436')

やっぱり、0オリジンですが…。

こちらの、実行結果。なんてことはありませんが。

isbn13: 978-4798128436, price: 3360, publish_date: Wed Jan 16 00:00:00 JST 2013, title: Cassandra実用システムインテグレーション

とりあえず、こんなところで。

今回作成したサンプル全体は、こんな感じです。ドライバが依存しているCassandraのバージョンが1.2.3だったので、ちょっと上げておきました。

@Grab('com.datastax.cassandra:cassandra-driver-core:1.0.0')
@Grab('org.apache.cassandra:cassandra-all:1.2.5')
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Host
import com.datastax.driver.core.Metadata

import com.datastax.driver.core.BoundStatement
import com.datastax.driver.core.PreparedStatement

def cluster = Cluster.builder().addContactPoint('localhost').withPort(9042).build()

try {
    def metadata = cluster.getMetadata()
    println("Connected to cluster: ${metadata.clusterName}")

    for (host in metadata.allHosts) {
        println("Datacenter: $host.datacenter, Host: $host.address, Rack: $host.rack")
    }

    def session = cluster.connect('cqldemo')

    println('----- Execute Statement -----')
    for (row in session.execute('SELECT * FROM Books')) {
        println("isbn13: ${row.getString('isbn13')}, " +
                    "price: ${row.getInt('price')}, " + 
                    "publish_date: ${row.getDate('publish_date')}, " +
                    "title: ${row.getString('title')}")
        /*
        println("isbn13: ${row.getString(0)}, " +
                    "price: ${row.getInt(1)}, " + 
                    "publish_date: ${row.getDate(2)}, " +
                    "title: ${row.getString(3)}")
        */
    }

    def preparedStatement = session.prepare('SELECT * FROM Books WHERE isbn13 = ?')
    def boundStatement = new BoundStatement(preparedStatement)

    println('----- Execute PreparedStatement -----')
    for (row in session.execute(boundStatement.bind('978-4798128436'))) {
    // for (row in session.execute(boundStatement.setString(0, '978-4798128436'))) {
        println("isbn13: ${row.getString('isbn13')}, " +
                    "price: ${row.getInt('price')}, " + 
                    "publish_date: ${row.getDate('publish_date')}, " +
                    "title: ${row.getString('title')}")
    }
} finally {
    cluster.shutdown()
}