前回は、CassandraのThriftサンプルを意味はあまり考えずにGroovyに書き直したものでしたが、今回はちょっと意味の理解も踏まえて書いてみることにしました。
まずは、キースペースとカラムファミリの用意。
[default@unknown] create keyspace Room; 7b6e7ec9-d6ef-3d5e-9e54-53784a03e59b [default@unknown] use Room; Authenticated to keyspace: Room [default@Room] create column family Users ... with key_validation_class = 'UTF8Type' ... and comparator = 'UTF8Type' ... and default_validation_class = 'UTF8Type'; 98bb2038-8b70-3a06-b77f-2d148941fee2
はい。
続いて、Groovy側ですが今度は冗長に書いていくのは嫌だったので、少しExpandMetaClassを使ったショートカットを追加しました。
Stringからbyte配列やByteBufferの変更が多かったり
String.metaClass { asBinary << { getBytes(StandardCharsets.UTF_8) } asBuffer << { ByteBuffer.wrap(asBinary()) } }
Cassandraへの接続を隠しておいたり
Cassandra.Client.metaClass { 'static' { openWith << { conn, cls -> def transport = new TFramedTransport(new TSocket(conn['host'], conn['port'])) def protocol = new TBinaryProtocol(transport) def client = new Cassandra.Client(protocol) transport.open() try { client.set_keyspace(conn['keyspace']) cls(client) } finally { transport.close() } } } }
Columnのインスタンス生成って、けっこう面倒ですよねーとか
Column.metaClass.static.create << { name, value, timestamp -> def column = new Column(name.toString().asBuffer()) column.setValue(value.toString().asBuffer()) column.timestamp = timestamp column }
Mutationの作成なんて、素で書くと辛そうな感じだったので…。
Mutation.metaClass { 'static' { columnsMap << { key, columnFamilyName, columns -> def mutations = [] for (column in columns) { def colOrSuper = new ColumnOrSuperColumn().setColumn(column) def mutation = new Mutation() mutation.column_or_supercolumn = colOrSuper mutations << mutation } def map = [: ] def mutationMap = [: ] mutationMap[columnFamilyName] = mutations map[key.asBuffer()] = mutationMap map } deleteColumnsMap << { key, columnFamilyName, columnNames, timestamp -> def predicate = new SlicePredicate().setColumn_names(columnNames.collect { it.asBuffer() }) def deletion = new Deletion() deletion.predicate = predicate deletion.timestamp = timestamp def mutation = new Mutation() mutation.deletion = deletion def map = [: ] def mutationMap = [: ] mutationMap[columnFamilyName] = [mutation] map[key.asBuffer()] = mutationMap map } } }
ここから書くサンプルには、以下のimport文が入っているものとします。
import java.nio.ByteBuffer import java.nio.charset.StandardCharsets @Grab('org.apache.cassandra:cassandra-all:1.2.4') import org.apache.cassandra.thrift.Cassandra import org.apache.cassandra.thrift.Column import org.apache.cassandra.thrift.ColumnOrSuperColumn import org.apache.cassandra.thrift.ColumnParent import org.apache.cassandra.thrift.ColumnPath import org.apache.cassandra.thrift.ConsistencyLevel import org.apache.cassandra.thrift.Deletion import org.apache.cassandra.thrift.KeyRange import org.apache.cassandra.thrift.KeySlice import org.apache.cassandra.thrift.Mutation import org.apache.cassandra.thrift.SlicePredicate import org.apache.cassandra.thrift.SliceRange import org.apache.thrift.transport.TTransport import org.apache.thrift.transport.TFramedTransport import org.apache.thrift.transport.TSocket import org.apache.thrift.protocol.TProtocol import org.apache.thrift.protocol.TBinaryProtocol
で、先ほどCassandra.Clientに追加した、openWithメソッドに渡すClosureとして処理を書いていきます。
Cassandra.Client.openWith([host: 'localhost', port: 9160, keyspace: 'Room']) { client -> // ここに、Cassandra.Clientを使った処理を書く }
Closureの引数としては、Cassandra.Clientのインスタンスが渡ってくるようになっています。
カラムファミリは「Users」で、対応するColumnParent、そして登録処理で使うタイムスタンプはあらかじめ作っておきます。
def columnFamilyName = 'Users' def columnParent = new ColumnParent(columnFamilyName) def timestamp = System.currentTimeMillis() * 1000
ちなみに、タイムスタンプはナノ秒で保存しているそうなので、一応1000倍しています。
以下のColumnの項目を参照
http://wiki.apache.org/cassandra/DataModel
Cassandra.Client#insert
カラムをひとつずつ登録するメソッドです。
// カラムをひとつずつ登録 client.insert('1'.asBuffer(), columnParent, Column.create('name', 'Suzuki Taro', timestamp), ConsistencyLevel.ALL) client.insert('1'.asBuffer(), columnParent, Column.create('age', 20, timestamp), ConsistencyLevel.ALL) client.insert('1'.asBuffer(), columnParent, Column.create('occupation', 'System Engineer', timestamp), ConsistencyLevel.ALL)
引数には、それぞれロウキー、ColumnParent、Column、ConsistencyLevelを設定します。ConsistencyLevelは、今回のサンプルでは全てもっとも整合性の高いALLを使っています。
StringにasBufferメソッドを追加しているので、
'1'.asBuffer()
みたいにちょっとだけ端折れています。また、Columnのインスタンスを作成するのは、ホントなら以下のように書くので、それだけで行数取るんですよね…。
Column.metaClass.static.create << { name, value, timestamp -> def column = new Column(name.toString().asBuffer()) column.setValue(value.toString().asBuffer()) column.timestamp = timestamp column }
はい、ここでもByteBuffer使いまくりです。
ちなみに、今回のサンプルで追加している3つのカラムはまったく関係性がないので、RDBMSのトランザクションとは違って、2カラムはうまく追加できたけど、3カラム目で失敗したりすると、ふつうに2カラムは追加されたまま残ります。
Cassandraって、そういうものですよね。
これを実行した後にcassandra-cliで見ると、こういう結果になります。
[default@Room] list Users; Using default limit of 100 Using default column limit of 100 ------------------- RowKey: 1 => (column=age, value=20, timestamp=1367135687162000) => (column=name, value=Suzuki Taro, timestamp=1367135687162000) => (column=occupation, value=System Engineer, timestamp=1367135687162000) 1 Row Returned. Elapsed time: 100 msec(s).
Cassandra.Client#batch_mutate
insertと違って、複数カラムを1回のオペレーションで登録するメソッドです。
が、これが面倒で…。
とりあえず、サンプルとしてはこんな感じで。
// 複数カラムの一括登録 client.batch_mutate(Mutation.columnsMap('2', columnFamilyName, [Column.create('name', 'Tanaka Jiro', timestamp), Column.create('age', 22, timestamp), Column.create('occupation', 'Programmer', timestamp)]), ConsistencyLevel.ALL)
登録するColumnを生成してListにしている分はいいのですが、
[Column.create('name', 'Tanaka Jiro', timestamp), Column.create('age', 22, timestamp), Column.create('occupation', 'Programmer', timestamp)]),
batch_mutateメソッドの引数は、こんなシグネチャなので
public void batch_mutate(java.util.Map<java.nio.ByteBuffer,java.util.Map<java.lang.String,java.util.List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
カラムファミリの名前をキーにして、MutationのListにしたMapを、さらにロウキーのByteBufferをキーにしたMapとして作成する必要があります。
なので、Mutationにこういうstaticメソッドを追加しました。
Mutation.metaClass { 'static' { columnsMap << { key, columnFamilyName, columns -> def mutations = [] for (column in columns) { def colOrSuper = new ColumnOrSuperColumn().setColumn(column) def mutation = new Mutation() mutation.column_or_supercolumn = colOrSuper mutations << mutation } def map = [: ] def mutationMap = [: ] mutationMap[columnFamilyName] = mutations map[key.asBuffer()] = mutationMap map } deleteColumnsMap << { key, columnFamilyName, columnNames, timestamp -> // 省略 } } }
普通に書いていたら、これだけでコードが埋まります…。実行結果は、insertのものとそんなに変わらないので割愛。
Cassandra.Client#get
単一のカラムを取得するためのメソッドです。
使い方の例は、こんな感じ。
// 単一カラムを取得 def targetColumnPath = new ColumnPath(columnFamilyName) targetColumnPath.setColumn('name'.asBuffer()) def colOrSupCol = client.get('1'.asBuffer(), targetColumnPath, ConsistencyLevel.ALL) colOrSupCol.column.with { println('get one coloumn: [name: value] => [' + new String(name, StandardCharsets.UTF_8) + ': ' + new String(value, StandardCharsets.UTF_8) + ']') }
呼び出しの際には、ロウキーのByteBuffer、ColumnPath、ConsistencyLevelが必要です。
def colOrSupCol = client.get('1'.asBuffer(), targetColumnPath, ConsistencyLevel.ALL)
ColumnPathは、カラムファミリ名とカラム名のByteBufferを設定します。
def targetColumnPath = new ColumnPath(columnFamilyName) targetColumnPath.setColumn('name'.asBuffer())
Cassandra.Client#getの戻り値はColumnOrSuperColumクラスのインスタンスになっているので、今回はColumnOrSuperColunn#getColumnで中身のColumnを取り出しています。
*今回は、スーパーカラムは試していません
colOrSupCol.column.with { println('get one coloumn: [name: value] => [' + new String(name, StandardCharsets.UTF_8) + ': ' + new String(value, StandardCharsets.UTF_8) + ']')
Columnからは、nameやvalueが取れますがbyte配列なので、Stringにデコードしています…。
先のinsertメソッドの例で登録したデータに対して実行すると、こんな感じになります。
get one coloumn: [name: value] => [name: Suzuki Taro]
Cassandra.Client#get_slice
複数カラムを一括で取得するためには、get_sliceメソッドを使用します。
// 複数カラムを一括取得 def predicate = new SlicePredicate() def range = new SliceRange(ByteBuffer.wrap(new byte[0]), // Start ByteBuffer.wrap(new byte[0]), // Finish false, // Reverse 100) // Count predicate.slice_range = range def colomnOrSuperColumns = client.get_slice('1'.asBuffer(), columnParent, predicate, ConsistencyLevel.ALL) for (columnOrSuperColumn in colomnOrSuperColumns) { def column = columnOrSuperColumn.column println('get multi column: [name: value] => [' + new String(column.name, StandardCharsets.UTF_8) + ': ' + new String(column.value, StandardCharsets.UTF_8) + ']') }
まずは、SlicePredicateとSliceRangeのインスタンスを作成します。
def predicate = new SlicePredicate() def range = new SliceRange(ByteBuffer.wrap(new byte[0]), // Start ByteBuffer.wrap(new byte[0]), // Finish false, // Reverse 100) // Count predicate.slice_range = range
ここでSliceRangeのコンストラクタの値は、それぞれ
- 取得するカラム(名)の範囲・開始
- 取得するカラム(名)の範囲・終了
- カラムのソート順(trueは逆順)
- 取得するカラムの数
となっています。なんか、これを設定してカラムファミリに設定していたcomparatorとかの意味がわかってきましたね…。
あとは、ロウキーのByteBufferとColumnParent、そしてSliceRangeを設定したSlicePredicate、ConsistencyLevelを渡してget_sliceを呼び出します。
def colomnOrSuperColumns = client.get_slice('1'.asBuffer(), columnParent, predicate, ConsistencyLevel.ALL) for (columnOrSuperColumn in colomnOrSuperColumns) { def column = columnOrSuperColumn.column println('get multi column: [name: value] => [' + new String(column.name, StandardCharsets.UTF_8) + ': ' + new String(column.value, StandardCharsets.UTF_8) + ']') }
戻り値は、ColumnOrSuperColumnのListなので、必要に応じて中身を抜き出します。
ロウキーを指定することからも明らかですが、取得できるのは「複数ロウ」ではなく、「複数カラム」です。
*複数ロウを取る場合は、multiget_sliceメソッドを使用するようですが今回は試していません
先のbatch_mutateの例で登録したデータに対して実行すると、こんな結果になります。
get multi column: [name: value] => [age: 20]
get multi column: [name: value] => [name: Suzuki Taro]
get multi column: [name: value] => [occupation: System Engineer]
ここで、ちょっとSliceRangeの設定を変えてみましょう。例えば、Reverseをtrueにすると
def range = new SliceRange(ByteBuffer.wrap(new byte[0]), // Start ByteBuffer.wrap(new byte[0]), // Finish true, // Reverse 100) // Count
取得結果が逆順になります。
get multi column: [name: value] => [occupation: System Engineer] get multi column: [name: value] => [name: Suzuki Taro] get multi column: [name: value] => [age: 20]
Countを1にすると
def range = new SliceRange(ByteBuffer.wrap(new byte[0]), // Start ByteBuffer.wrap(new byte[0]), // Finish false, // Reverse 1) // Count
取れるColumnOrSuperColumnがひとつになります。
get multi column: [name: value] => [age: 20]
カラム名を明示的に絞り込むと
def range = new SliceRange('name'.asBuffer(), // Start 'occupation'.asBuffer(), // Finish false, // Reverse 100) // Count
取得できるカラムは、その範囲内になります。
get multi column: [name: value] => [name: Suzuki Taro] get multi column: [name: value] => [occupation: System Engineer]
なお、最初の例でStartとFinishに指定していた
def range = new SliceRange(ByteBuffer.wrap(new byte[0]), // Start ByteBuffer.wrap(new byte[0]), // Finish
というのは、「全カラム」という意味になっています。nullなどの指定は不可だそうです。
Cassandra.Client#get_count
カラム数を取得するためのメソッドです。ここでも、SlicePredicateとSliceRangeを指定します。
// カラム数の取得 def predicateForCount = new SlicePredicate() def rangeForCount = new SliceRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, 100) predicateForCount.slice_range = rangeForCount def count = client.get_count('1'.asBuffer(), columnParent, predicateForCount, ConsistencyLevel.ALL) println("column count => $count")
SlicePredicateで指定した、カラムの範囲の数が取れるってことですね。先のinsertの例で実行した結果にしての実行結果は、こんな感じです。
column count => 3
Cassandra.Client#remove
特定のカラムを削除するには、removeメソッドを使用します。
ロウキーのByteBuffer、それから削除したいカラムの名前(ByteBuffer)を設定したColumnPath、タイムスタンプとConsistencyLevelを指定して実行します。
// 特定のカラムの削除 client.remove('1'.asBuffer(), new ColumnPath(columnFamilyName).setColumn('occupation'.asBuffer()), System.currentTimeMillis() * 1000, ConsistencyLevel.ALL)
この例では、「occupation」という名前のカラムが削除されます。
また、removeでロウキーに紐付くカラムを一括削除することもできます。
client.remove('2'.asBuffer(), new ColumnPath(columnFamilyName), System.currentTimeMillis() * 1000, ConsistencyLevel.ALL)
ColumnPathに、カラム名を設定しなければOKです。これで、全カラムが消えます。
が、なぜかロウキーは残ってしまいますが…。
Cassandra.Client#batch_mutate
複数のカラムを一括で削除するもうひとつの方法として、登録時と同じbatch_mutateメソッドを使うやり方があります。
// 名前を指定して、複数カラムの一括削除 // が、ロウは残る… client.batch_mutate(Mutation.deleteColumnsMap('2', columnFamilyName, ['name', 'age', 'occupation'], System.currentTimeMillis() * 1000), ConsistencyLevel.ALL)
batch_mutateメソッドは、カラムの登録時と全く同じものを使うので、組み立てるMutationの設定が変わります。
こちらが、MutationのExpandMetaClassに追加した削除用のMutationを含んだMapを作成するメソッドです。
deleteColumnsMap << { key, columnFamilyName, columnNames, timestamp -> def predicate = new SlicePredicate().setColumn_names(columnNames.collect { it.asBuffer() }) def deletion = new Deletion() deletion.predicate = predicate deletion.timestamp = timestamp def mutation = new Mutation() mutation.deletion = deletion def map = [: ] def mutationMap = [: ] mutationMap[columnFamilyName] = [mutation] map[key.asBuffer()] = mutationMap map }
削除するカラム名を指定してSlicePredicateを作成し、それを設定したDeletionを作成するところがポイントです。
def predicate = new SlicePredicate().setColumn_names(columnNames.collect { it.asBuffer() }) def deletion = new Deletion() deletion.predicate = predicate deletion.timestamp = timestamp def mutation = new Mutation() mutation.deletion = deletion
あとは、MutationにもDeletionを設定します。
ただ、この実装だとカラムは確かに全部消えたのですが、ロウキーは残ったままでした…。
また、SlicePredicateを使うのでSliceRangeを使ったカラム名の範囲指定ができるはずなのですが(ByteBuffer.wrap(new byte[0])を指定する)、これを実行するとエラーになりました…。
// *batch_mutateでは、名前の範囲指定による一括削除は1.2.4でも不可 // Caught: InvalidRequestException(why:Deletion does not yet support SliceRange predicates.) // InvalidRequestException(why:Deletion does not yet support SliceRange predicates.)
Cassandra.Client#truncate
指定したカラムファミリ名に属するデータを全て削除します。
client.truncate(columnFamilyName)
なんか、ちょっとずつわかってきた気がしますね。
しかし、APIが独特過ぎてとっつきにくいですね…。Thriftを使うと、こんな感じになるんでしょうか?当面は素のAPIを触ろうと思っているので、ハイレベルなクライアントは使わないつもりですが…いやー、大変。
今回のエントリを書くにあたり、こちらのサイトを参考にさせていただきました。
http://www.ne.jp/asahi/hishidama/home/tech/apache/cassandra/java.html
使っているクラスのコンストラクタや一部メソッドのシグニチャが、Cassanraのバージョンアップと共にだいぶ変わっていましたが、だいたいの内容は今でも通用します。非常に助かりました。