ReactorのApache Kafka向けのライブラリとして、Reactor Kafkaというものがあります。
GitHub - reactor/reactor-kafka: Reactive Kafka Driver with Reactor
こちらを使って、Apache Kafkaにアクセスしてみましょう。
準備
Maven依存関係には、ReactorのBOMがあるのでまずはこちらを設定します。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Reactor Kafkaの依存関係を追加。
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
ここで依存関係に追加されるReactor Kafkaのバージョンは、「1.0.0.RELEASE」となります。また、このReactor Kafkaが
依存しているApache Kafkaのクライアントライブラリのバージョンは「0.11.0.0」です。
あとは、テストライブラリを追加。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.0.3</version>
<scope>test</scope>
</dependency>
テストコードを書きつつ、確認していきます。
BrokerとしてのApache Kafkaは、1.0.0を使用しました。Topicは、テストコードごとに用意するものとします。
Replication Factorは1、Partition数も1で。
テストコードの雛形
テストコードの雛形は、このようなものを用意。
src/test/java/org/littlewings/reactor/kafka/ReactorKafkaTest.java
package org.littlewings.reactor.kafka;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.TransactionManager;
import reactor.test.StepVerifier;
import reactor.util.concurrent.Queues;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class ReactorKafkaTest {
}
以降は、この中に
- とりあえずメッセージ送受信してみるサンプル
- コミット付き(手動)
- コミット付き(バッチサイズ - ack)
- トランザクション付き
- トランザクション付き(TransactionManager使用)
- トランザクション付き - abort(TransactionManager使用)
を書いていってみます。
なお、Reactor Kafkaにはサンプルコードもあるので、こちらも適宜見ておくとよいでしょう。
https://github.com/reactor/reactor-kafka/tree/v1.0.0.RELEASE/reactor-kafka-samples/src/main/java/reactor/kafka/samples
簡単にメッセージの送受信
サンプルコードは、こちら。
@Test
public void gettingStarted() {
String brokerConnectString = "172.25.0.3:9092";
String topicName = "reactor-kafka-topic";
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions =
SenderOptions.<String, String>create(producerProperties).maxInFlight(Queues.SMALL_BUFFER_SIZE);
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
Flux<SenderRecord<String, String, Integer>> sendMessages =
Flux
.range(1, 10)
.map(i -> SenderRecord.create(topicName, null, null, "key" + i, "value" + i, i));
sender
.send(sendMessages)
.subscribe();
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create(consumerProperties)
.subscription(Collections.singletonList(topicName));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
StepVerifier
.create(collectedMessages)
.expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
.verifyComplete();
sender.close();
}
Reactor Kafkaでのメッセージの送受信には、それぞれKafkaSender、KafkaReceiverを使います。
Reactive Kafka Sender
Reactive Kafka Receiver
まずはKafkaSenderの作成から。SenderOptionsを作成してから、KafkaSenderを作成する流れになります。
String brokerConnectString = "172.25.0.3:9092";
String topicName = "reactor-kafka-topic";
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions =
SenderOptions.<String, String>create(producerProperties).maxInFlight(Queues.SMALL_BUFFER_SIZE);
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
SenderOptionsの作成方法には、java.util.Properties、java.util.Mapを使用する方法、メソッドチェーンで組めるものなど
いくつかあるのですが、今回はMapを使うもので書いています。他のパターンは、以降でちょこちょこと切り替えて出していきます。
基本的に設定できるのは、Apache KafkaのProducerConfigのものになります。
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
それとは別に、Reactor Kafkaとしての設定も加えることが可能です。
SenderOptions<String, String> senderOptions =
SenderOptions.<String, String>create(producerProperties).maxInFlight(Queues.SMALL_BUFFER_SIZE);
これでSenderOptionsが作成できたら、KafkaSenderを作成することができます。
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
で、送信するメッセージをPublisherとして放り込みます。メッセージは、SenderRecord、もしくはProducerRecordの形で作成することができます。
Flux<SenderRecord<String, String, Integer>> sendMessages =
Flux
.range(1, 10)
.map(i -> SenderRecord.create(topicName, null, null, "key" + i, "value" + i, i));
SenderRecordを使う場合は、SenderRecord#create時にメタデータを登録することができます。ここでは、ループを回している時のカウンタ値をそのまま
入れています。
ここで作成したPublisher(今回はFlux)をKafkaSender#sendに渡し、subscribeすることでメッセージが送信されます。subscribeしないと、何も起こりません…。
sender
.send(sendMessages)
.subscribe();
使い終わったら、KafkaSenderはcloseします。
sender.close();
KafkaReceiverも、ReceiverOptionsを作成してから、KafkaReceiverを作成する流れになります。指定するオプションは、Apache KafkaのConsumerConfigのものが
基本になります。
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create(consumerProperties)
.subscription(Collections.singletonList(topicName));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
どのTopicを対象とするかは、ReceiverOptionsでのsubscriptionとして指定します。
メッセージはKafkaReceiver#receiveで開始され、今回は10個のメッセージを受け取ったら完了としました。
Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
StepVerifier
.create(collectedMessages)
.expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
.verifyComplete();
まずは、こんなところで。
コミット制御してみる
続いて、今度はコミット制御してみましょう。
手動コミット
最初は、手動コミットで。
こちらを参考にするとよいです。
Controlling commit frequency
@Test
public void withManualCommit() {
String brokerConnectString = "172.25.0.3:9092";
String topicName = "reactor-kafka-with-commit-topic1";
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions =
SenderOptions.create(producerProperties);
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
Flux<ProducerRecord<String, String>> sendMessages =
Flux
.range(1, 10)
.map(i -> new ProducerRecord<>(topicName, "key" + i, "value" + i));
sender
.createOutbound()
.send(sendMessages)
.then()
.subscribe();
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create(consumerProperties)
.subscription(Collections.singletonList(topicName));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.doOnNext(m -> m.receiverOffset().commit().block())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
StepVerifier
.create(collectedMessages)
.expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
.verifyComplete();
sender.close();
}
さっきの例からしょうもなく変えた部分として、今回はメタデータは要らないかなぁと思い、ProducerRecordを使うようにしました。
Flux<ProducerRecord<String, String>> sendMessages =
Flux
.range(1, 10)
.map(i -> new ProducerRecord<>(topicName, "key" + i, "value" + i));
sender
.createOutbound()
.send(sendMessages)
.then()
.subscribe();
この場合、KafkaSender側はKafkaSender#createOutboundしてからメッセージを渡すことになります。
Send without result metadata
sender
.createOutbound()
.send(sendMessages)
.then()
.subscribe();
ポイントは、KafkaSender(KafkaConsumer)の設定として、オートコミットをオフにしているということと、
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create(consumerProperties)
.subscription(Collections.singletonList(topicName));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
KafkaReceriver側で、メッセージを受け取ったらReceiverOffset#commitを呼び出していることです。
Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.doOnNext(m -> m.receiverOffset().commit().block())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
バッチサイズでのコミット
続いて、バッチサイズでのコミットを行っていきます。
@Test
public void withBatchCommit() {
String brokerConnectString = "172.25.0.3:9092";
String topicName = "reactor-kafka-with-commit-topic2";
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions =
SenderOptions.create(producerProperties);
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
Flux<ProducerRecord<String, String>> sendMessages =
Flux
.range(1, 10)
.map(i -> new ProducerRecord<>(topicName, "key" + i, "value" + i));
sender
.createOutbound()
.send(sendMessages)
.then()
.subscribe();
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create(consumerProperties)
.commitBatchSize(5)
.subscription(Collections.singletonList(topicName));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.flatMap(Function.identity())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
StepVerifier
.create(collectedMessages)
.expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
.verifyComplete();
sender.close();
}
KafkaSenderは変わらないので、割愛。
KafkaReceiver側では、ReceriverOptionsを作成する際に、commitBatchSizeを指定します。今回は、メッセージ数が10ですが、5で設定。
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create(consumerProperties)
.commitBatchSize(5)
.subscription(Collections.singletonList(topicName));
で、メッセージを受け取った際に、バッチサイズに達したらコミットして欲しいのですが、ここでackが必要になります。今回は、AutoAckを使いました。
Auto-acknowledgement of batches of records
>|java||
Flux>> receiveMessages = receiver.receiveAutoAck();
Mono>> collectedMessages =
receiveMessages
.flatMap(Function.identity())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
|
これを使うと、受信するメッセージが入れ子になったFluxになるので、ちょっと注意しましょう。
AutoAckを使わない場合は、自分でackするように実装するのではないかと思います。
Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.doOnNext(m -> m.receiverOffset().acknowledge())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
これで、オートコミットをオフにした時の、コミットのやり方がわかります、と。
最後に、トランザクションを使ってみましょう。
KafkaSender#sendTransactionallyを使う
まずは、簡単にKafkaSender#sendTransactionallyを使ってトランザクショナルなメッセージ送信を行ってみます。
Transactional send
サンプルコードは、このように。
@Test
public void transactionally() {
String brokerConnectString = "172.25.0.3:9092";
String topicName = "reactor-kafka-transactional-topic1";
SenderOptions<String, String> senderOptions =
SenderOptions
.<String, String>create()
.producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
.producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-x");
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
Flux<ProducerRecord<String, String>> sendMessages =
Flux
.range(1, 10)
.map(i -> new ProducerRecord<>(topicName, "key" + i, "value" + i));
sender
.createOutbound()
.sendTransactionally(Mono.just(sendMessages))
.then()
.subscribe();
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create()
.consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group")
.consumerProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.consumerProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
.consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
.commitBatchSize(5)
.subscription(Collections.singletonList(topicName));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.flatMap(Function.identity())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
StepVerifier
.create(collectedMessages)
.expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
.verifyComplete();
sender.close();
}
SenderOptionsには、transactional.idの指定が必要になります。このことから、裏でApache Kafkaのトランザクションが使われていることがわかります。
SenderOptions<String, String> senderOptions =
SenderOptions
.<String, String>create()
.producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
.producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-x");
なお、SenderOptionsを作成する時のAPIは、メソッドチェーンの方に変えました。オートコミットはオフにして、バッチサイズによるコミット方針でいきます。
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create()
.consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group")
.consumerProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.consumerProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
.consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
.commitBatchSize(5)
.subscription(Collections.singletonList(topicName));
isolation.levelは「read_committed」にしています。
ここでのポイントは、メッセージの送信時に、KafkaSender#sendTransactionallyを使うことですね。
sender
.createOutbound()
.sendTransactionally(Mono.just(sendMessages))
.then()
.subscribe();
TransactionManagerを使う
続いて、TransactionManagerを使ってみます。
こちらについてはドキュメントに記載がないですが、Javadocを見るのがよいかと。
https://projectreactor.io/docs/kafka/1.0.0.RELEASE/api/reactor/kafka/sender/TransactionManager.html
APIの説明を見ると、どれもKafkaProducerのトランザクション系のメソッドを見てね、という感じになっています。
サンプルはこちら。
@Test
public void withTransactionManager() {
String brokerConnectString = "172.25.0.3:9092";
String topicName = "reactor-kafka-transactional-topic2";
SenderOptions<String, String> senderOptions =
SenderOptions
.<String, String>create()
.producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
.producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-x");
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
Flux<SenderRecord<String, String, Integer>> sendMessages =
Flux
.range(1, 10)
.map(i -> SenderRecord.create(new ProducerRecord<>(topicName, "key" + i, "value" + i), i));
TransactionManager transactionManager = sender.transactionManager();
transactionManager
.begin()
.then(sender.send(sendMessages).next())
.then(transactionManager.commit())
.subscribe();
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create()
.consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group")
.consumerProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.consumerProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
.consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
.commitBatchSize(5)
.subscription(Collections.singletonList(topicName));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.flatMap(Function.identity())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
StepVerifier
.create(collectedMessages)
.expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
.verifyComplete();
sender.close();
}
ここでのポイントは、KafkaSender#transactionManagerでTransactionManagerを取得し、
TransactionManager transactionManager = sender.transactionManager();
メッセージの送信にTransactionManagerを絡めるところになります(begin/commit)。
transactionManager
.begin()
.then(sender.send(sendMessages).next())
.then(transactionManager.commit())
.subscribe();
TransactionManagerを使う(abort)
最後に、トランザクションをabortするケースを。
先のTransactionManagerを使ったコードと、そう変わりません。
@Test
public void abortTransaction() {
String brokerConnectString = "172.25.0.3:9092";
String topicName = "reactor-kafka-transactional-topic3";
SenderOptions<String, String> senderOptions =
SenderOptions
.<String, String>create()
.producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
.producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-x");
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
Flux<SenderRecord<String, String, Integer>> sendMessages =
Flux
.range(1, 10)
.map(i -> SenderRecord.create(new ProducerRecord<>(topicName, "key" + i, "value" + i), i));
TransactionManager transactionManager = sender.transactionManager();
transactionManager
.begin()
.then(sender.send(sendMessages).next())
.then(transactionManager.abort())
.subscribe();
ReceiverOptions<String, String> receiverOptions =
ReceiverOptions
.<String, String>create()
.consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group")
.consumerProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.consumerProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
.consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
.commitBatchSize(5)
.subscription(Collections.singletonList(topicName));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.timeout(Duration.ofSeconds(5L))
.flatMap(Function.identity())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
StepVerifier
.create(collectedMessages)
.expectError(TimeoutException.class)
.verify();
sender.close();
}
TransactionManager#commitの代わりに、TransactionManager#abortを使用しているだけです。
transactionManager
.begin()
.then(sender.send(sendMessages).next())
.then(transactionManager.abort())
.subscribe();
この場合、メッセージが入らないのでKafkaReceiver側ではメッセージが流れてこなくなります。
Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();
Mono<List<Tuple2<String, String>>> collectedMessages =
receiveMessages
.timeout(Duration.ofSeconds(5L))
.flatMap(Function.identity())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
StepVerifier
.create(collectedMessages)
.expectError(TimeoutException.class)
.verify();
とりあえず、こんなところでしょうか?
まとめ
Reactor Kafkaを、ちょっと使って遊んでみました。
Apache Kafkaのクライアントライブラリの上に構築されているので、そちらの方の使い方や用語も理解しておいた方が助けになりそうですね。ちょっとずつ
理解を深めていきましょう。