CLOVER🍀

That was when it all began.

Reactor Kafkaで遊ぶ

ReactorのApache Kafka向けのライブラリとして、Reactor Kafkaというものがあります。

GitHub - reactor/reactor-kafka: Reactive Kafka Driver with Reactor

こちらを使って、Apache Kafkaにアクセスしてみましょう。

Reactor Kafka?

Reactor Kafkaは、Apache Kafka向けのリアクティブなAPIを提供します。

Reactive API for Kafka

Apache Kafka向けの、Non BlockingでリアクティブなAPIとして使えます、と。

他のApache Kafka向けのAPIと比べてどうなんですか、というところですが、リアクティブ/Non Blockingなうえで、
ファンクショナルなスタイルで書けるよ、データ変換などが入る場合にReactor Kafkaを使うのがオススメだよ、
という感じみたいです。

Comparisons with other Kafka APIs

Kafka Producer and Consumer APIs
Kafka Connect API
Kafka Streams API

特に、外部システム/リソースにリアクティブに使えるコネクタがあり、そのデータを使った変換処理などがある場合には
採用を検討した方がいいよ、というところですね。

まあ、とりあえず使ってみて感覚をつかんでいってみましょう。

準備

Maven依存関係には、ReactorのBOMがあるのでまずはこちらを設定します。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

Reactor Kafkaの依存関係を追加。

        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
        </dependency>

ここで依存関係に追加されるReactor Kafkaのバージョンは、「1.0.0.RELEASE」となります。また、このReactor Kafkaが
依存しているApache Kafkaのクライアントライブラリのバージョンは「0.11.0.0」です。

あとは、テストライブラリを追加。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.0.3</version>
            <scope>test</scope>
        </dependency>

テストコードを書きつつ、確認していきます。

BrokerとしてのApache Kafkaは、1.0.0を使用しました。Topicは、テストコードごとに用意するものとします。
Replication Factorは1、Partition数も1で。

テストコードの雛形

テストコードの雛形は、このようなものを用意。
src/test/java/org/littlewings/reactor/kafka/ReactorKafkaTest.java

package org.littlewings.reactor.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.TransactionManager;
import reactor.test.StepVerifier;
import reactor.util.concurrent.Queues;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class ReactorKafkaTest {
    // ここに、テストコードを書く!
}

以降は、この中に

を書いていってみます。

なお、Reactor Kafkaにはサンプルコードもあるので、こちらも適宜見ておくとよいでしょう。
https://github.com/reactor/reactor-kafka/tree/v1.0.0.RELEASE/reactor-kafka-samples/src/main/java/reactor/kafka/samples

簡単にメッセージの送受信

サンプルコードは、こちら。

    @Test
    public void gettingStarted() {
        String brokerConnectString = "172.25.0.3:9092";
        String topicName = "reactor-kafka-topic";

        Map<String, Object> producerProperties = new HashMap<>();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<String, String> senderOptions =
                SenderOptions.<String, String>create(producerProperties).maxInFlight(Queues.SMALL_BUFFER_SIZE);

        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

        Flux<SenderRecord<String, String, Integer>> sendMessages =
                Flux
                        .range(1, 10)
                        .map(i -> SenderRecord.create(topicName, null, null, "key" + i, "value" + i, i));
        sender
                .send(sendMessages)
                .subscribe();

        Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create(consumerProperties)
                        .subscription(Collections.singletonList(topicName));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);

        Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();

        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();

        StepVerifier
                .create(collectedMessages)
                .expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
                .verifyComplete();

        sender.close();
    }

Reactor Kafkaでのメッセージの送受信には、それぞれKafkaSender、KafkaReceiverを使います。

Reactive Kafka Sender

Reactive Kafka Receiver

まずはKafkaSenderの作成から。SenderOptionsを作成してから、KafkaSenderを作成する流れになります。

        String brokerConnectString = "172.25.0.3:9092";
        String topicName = "reactor-kafka-topic";

        Map<String, Object> producerProperties = new HashMap<>();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<String, String> senderOptions =
                SenderOptions.<String, String>create(producerProperties).maxInFlight(Queues.SMALL_BUFFER_SIZE);

        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

SenderOptionsの作成方法には、java.util.Properties、java.util.Mapを使用する方法、メソッドチェーンで組めるものなど
いくつかあるのですが、今回はMapを使うもので書いています。他のパターンは、以降でちょこちょこと切り替えて出していきます。

基本的に設定できるのは、Apache KafkaのProducerConfigのものになります。

        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

それとは別に、Reactor Kafkaとしての設定も加えることが可能です。

        SenderOptions<String, String> senderOptions =
                SenderOptions.<String, String>create(producerProperties).maxInFlight(Queues.SMALL_BUFFER_SIZE);

これでSenderOptionsが作成できたら、KafkaSenderを作成することができます。

        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

で、送信するメッセージをPublisherとして放り込みます。メッセージは、SenderRecord、もしくはProducerRecordの形で作成することができます。

        Flux<SenderRecord<String, String, Integer>> sendMessages =
                Flux
                        .range(1, 10)
                        .map(i -> SenderRecord.create(topicName, null, null, "key" + i, "value" + i, i));

SenderRecordを使う場合は、SenderRecord#create時にメタデータを登録することができます。ここでは、ループを回している時のカウンタ値をそのまま
入れています。

ここで作成したPublisher(今回はFlux)をKafkaSender#sendに渡し、subscribeすることでメッセージが送信されます。subscribeしないと、何も起こりません…。

        sender
                .send(sendMessages)
                .subscribe();

使い終わったら、KafkaSenderはcloseします。

        sender.close();

KafkaReceiverも、ReceiverOptionsを作成してから、KafkaReceiverを作成する流れになります。指定するオプションは、Apache KafkaのConsumerConfigのものが
基本になります。

        Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create(consumerProperties)
                        .subscription(Collections.singletonList(topicName));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);

どのTopicを対象とするかは、ReceiverOptionsでのsubscriptionとして指定します。

メッセージはKafkaReceiver#receiveで開始され、今回は10個のメッセージを受け取ったら完了としました。

        Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();

        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();

        StepVerifier
                .create(collectedMessages)
                .expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
                .verifyComplete();

まずは、こんなところで。

コミット制御してみる

続いて、今度はコミット制御してみましょう。

手動コミット

最初は、手動コミットで。

こちらを参考にするとよいです。

Controlling commit frequency

    @Test
    public void withManualCommit() {
        String brokerConnectString = "172.25.0.3:9092";
        String topicName = "reactor-kafka-with-commit-topic1";

        Map<String, Object> producerProperties = new HashMap<>();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<String, String> senderOptions =
                SenderOptions.create(producerProperties);

        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

        Flux<ProducerRecord<String, String>> sendMessages =
                Flux
                        .range(1, 10)
                        .map(i -> new ProducerRecord<>(topicName, "key" + i, "value" + i));
        sender
                .createOutbound()
                .send(sendMessages)
                .then()
                .subscribe();

        Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create(consumerProperties)
                        .subscription(Collections.singletonList(topicName));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);

        Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();

        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .doOnNext(m -> m.receiverOffset().commit().block())
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();

        StepVerifier
                .create(collectedMessages)
                .expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
                .verifyComplete();

        sender.close();
    }

さっきの例からしょうもなく変えた部分として、今回はメタデータは要らないかなぁと思い、ProducerRecordを使うようにしました。

        Flux<ProducerRecord<String, String>> sendMessages =
                Flux
                        .range(1, 10)
                        .map(i -> new ProducerRecord<>(topicName, "key" + i, "value" + i));
        sender
                .createOutbound()
                .send(sendMessages)
                .then()
                .subscribe();

この場合、KafkaSender側はKafkaSender#createOutboundしてからメッセージを渡すことになります。
Send without result metadata

        sender
                .createOutbound()
                .send(sendMessages)
                .then()
                .subscribe();

ポイントは、KafkaSender(KafkaConsumer)の設定として、オートコミットをオフにしているということと、

        Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create(consumerProperties)
                        .subscription(Collections.singletonList(topicName));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);

KafkaReceriver側で、メッセージを受け取ったらReceiverOffset#commitを呼び出していることです。

        Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();

        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .doOnNext(m -> m.receiverOffset().commit().block())
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();
バッチサイズでのコミット

続いて、バッチサイズでのコミットを行っていきます。

    @Test
    public void withBatchCommit() {
        String brokerConnectString = "172.25.0.3:9092";
        String topicName = "reactor-kafka-with-commit-topic2";

        Map<String, Object> producerProperties = new HashMap<>();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<String, String> senderOptions =
                SenderOptions.create(producerProperties);

        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

        Flux<ProducerRecord<String, String>> sendMessages =
                Flux
                        .range(1, 10)
                        .map(i -> new ProducerRecord<>(topicName, "key" + i, "value" + i));
        sender
                .createOutbound()
                .send(sendMessages)
                .then()
                .subscribe();

        Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create(consumerProperties)
                        .commitBatchSize(5)
                        .subscription(Collections.singletonList(topicName));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
        Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();

        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .flatMap(Function.identity())
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();

        /*
        Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();
        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .doOnNext(m -> m.receiverOffset().acknowledge())
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();
        */

        StepVerifier
                .create(collectedMessages)
                .expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
                .verifyComplete();

        sender.close();
    }

KafkaSenderは変わらないので、割愛。

KafkaReceiver側では、ReceriverOptionsを作成する際に、commitBatchSizeを指定します。今回は、メッセージ数が10ですが、5で設定。

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create(consumerProperties)
                        .commitBatchSize(5)
                        .subscription(Collections.singletonList(topicName));

で、メッセージを受け取った際に、バッチサイズに達したらコミットして欲しいのですが、ここでackが必要になります。今回は、AutoAckを使いました。
Auto-acknowledgement of batches of records
>|java||
Flux>> receiveMessages = receiver.receiveAutoAck();

Mono>> collectedMessages =
receiveMessages
.flatMap(Function.identity())
.buffer(10)
.map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
.next();
|

これを使うと、受信するメッセージが入れ子になったFluxになるので、ちょっと注意しましょう。

AutoAckを使わない場合は、自分でackするように実装するのではないかと思います。

        Flux<ReceiverRecord<String, String>> receiveMessages = receiver.receive();
        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .doOnNext(m -> m.receiverOffset().acknowledge())
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();

これで、オートコミットをオフにした時の、コミットのやり方がわかります、と。

トランザクションを使う

最後に、トランザクションを使ってみましょう。

KafkaSender#sendTransactionallyを使う

まずは、簡単にKafkaSender#sendTransactionallyを使ってトランザクショナルなメッセージ送信を行ってみます。
Transactional send

サンプルコードは、このように。

    @Test
    public void transactionally() {
        String brokerConnectString = "172.25.0.3:9092";
        String topicName = "reactor-kafka-transactional-topic1";

        SenderOptions<String, String> senderOptions =
                SenderOptions
                        .<String, String>create()
                        .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
                        .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                        .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                        .producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-x");

        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

        Flux<ProducerRecord<String, String>> sendMessages =
                Flux
                        .range(1, 10)
                        .map(i -> new ProducerRecord<>(topicName, "key" + i, "value" + i));

        sender
                .createOutbound()
                .sendTransactionally(Mono.just(sendMessages))
                .then()
                .subscribe();

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create()
                        .consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
                        .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group")
                        .consumerProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .consumerProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
                        .consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
                        .commitBatchSize(5)
                        .subscription(Collections.singletonList(topicName));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);

        Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();

        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .flatMap(Function.identity())
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();

        StepVerifier
                .create(collectedMessages)
                .expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
                .verifyComplete();

        sender.close();
    }

SenderOptionsには、transactional.idの指定が必要になります。このことから、裏でApache Kafkaのトランザクションが使われていることがわかります。

        SenderOptions<String, String> senderOptions =
                SenderOptions
                        .<String, String>create()
                        .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
                        .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                        .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                        .producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-x");

なお、SenderOptionsを作成する時のAPIは、メソッドチェーンの方に変えました。オートコミットはオフにして、バッチサイズによるコミット方針でいきます。

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create()
                        .consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
                        .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group")
                        .consumerProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .consumerProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
                        .consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
                        .commitBatchSize(5)
                        .subscription(Collections.singletonList(topicName));

isolation.levelは「read_committed」にしています。

ここでのポイントは、メッセージの送信時に、KafkaSender#sendTransactionallyを使うことですね。

        sender
                .createOutbound()
                .sendTransactionally(Mono.just(sendMessages))
                .then()
                .subscribe();
TransactionManagerを使う

続いて、TransactionManagerを使ってみます。

こちらについてはドキュメントに記載がないですが、Javadocを見るのがよいかと。
https://projectreactor.io/docs/kafka/1.0.0.RELEASE/api/reactor/kafka/sender/TransactionManager.html

APIの説明を見ると、どれもKafkaProducerのトランザクション系のメソッドを見てね、という感じになっています。

サンプルはこちら。

    @Test
    public void withTransactionManager() {
        String brokerConnectString = "172.25.0.3:9092";
        String topicName = "reactor-kafka-transactional-topic2";

        SenderOptions<String, String> senderOptions =
                SenderOptions
                        .<String, String>create()
                        .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
                        .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                        .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                        .producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-x");

        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

        Flux<SenderRecord<String, String, Integer>> sendMessages =
                Flux
                        .range(1, 10)
                        .map(i -> SenderRecord.create(new ProducerRecord<>(topicName, "key" + i, "value" + i), i));

        TransactionManager transactionManager = sender.transactionManager();

        transactionManager
                .begin()
                .then(sender.send(sendMessages).next())
                .then(transactionManager.commit())
                .subscribe();

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create()
                        .consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
                        .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group")
                        .consumerProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .consumerProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
                        .consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
                        .commitBatchSize(5)
                        .subscription(Collections.singletonList(topicName));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);

        Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();

        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .flatMap(Function.identity())
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();

        StepVerifier
                .create(collectedMessages)
                .expectNext(IntStream.rangeClosed(1, 10).mapToObj(i -> Tuples.of("key" + i, "value" + i)).collect(Collectors.toList()))
                .verifyComplete();

        sender.close();
    }

ここでのポイントは、KafkaSender#transactionManagerでTransactionManagerを取得し、

        TransactionManager transactionManager = sender.transactionManager();

メッセージの送信にTransactionManagerを絡めるところになります(begin/commit)。

        transactionManager
                .begin()
                .then(sender.send(sendMessages).next())
                .then(transactionManager.commit())
                .subscribe();
TransactionManagerを使う(abort)

最後に、トランザクションをabortするケースを。

先のTransactionManagerを使ったコードと、そう変わりません。

    @Test
    public void abortTransaction() {
        String brokerConnectString = "172.25.0.3:9092";
        String topicName = "reactor-kafka-transactional-topic3";

        SenderOptions<String, String> senderOptions =
                SenderOptions
                        .<String, String>create()
                        .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
                        .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                        .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                        .producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-x");

        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

        Flux<SenderRecord<String, String, Integer>> sendMessages =
                Flux
                        .range(1, 10)
                        .map(i -> SenderRecord.create(new ProducerRecord<>(topicName, "key" + i, "value" + i), i));

        TransactionManager transactionManager = sender.transactionManager();

        transactionManager
                .begin()
                .then(sender.send(sendMessages).next())
                .then(transactionManager.abort())
                .subscribe();

        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions
                        .<String, String>create()
                        .consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString)
                        .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "reactor-kafka-group")
                        .consumerProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .consumerProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
                        .consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
                        .commitBatchSize(5)
                        .subscription(Collections.singletonList(topicName));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);

        Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();

        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .timeout(Duration.ofSeconds(5L))
                        .flatMap(Function.identity())
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();

        StepVerifier
                .create(collectedMessages)
                .expectError(TimeoutException.class)
                .verify();

        sender.close();
    }

TransactionManager#commitの代わりに、TransactionManager#abortを使用しているだけです。

        transactionManager
                .begin()
                .then(sender.send(sendMessages).next())
                .then(transactionManager.abort())
                .subscribe();

この場合、メッセージが入らないのでKafkaReceiver側ではメッセージが流れてこなくなります。

        Flux<Flux<ConsumerRecord<String, String>>> receiveMessages = receiver.receiveAutoAck();

        Mono<List<Tuple2<String, String>>> collectedMessages =
                receiveMessages
                        .timeout(Duration.ofSeconds(5L))
                        .flatMap(Function.identity())
                        .buffer(10)
                        .map(messages -> messages.stream().map(m -> Tuples.of(m.key(), m.value())).collect(Collectors.toList()))
                        .next();

        StepVerifier
                .create(collectedMessages)
                .expectError(TimeoutException.class)
                .verify();

とりあえず、こんなところでしょうか?

やらなかったこと

こういう類のAPIの使い方の流れで、やらなかったことがあります。

こちら。
Exactly-once delivery

サンプルコードの意味がまだよくわからないのと、だいぶ長くなってきたので今回はパスです。

もうちょっと中身を

とはいうものの、ここまでの範囲で気になるところは追ってみましょう。

Reactor Kafkaとはいえ、中身にはApache KafkaのProducer/Consumerを使用しています。
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L88
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L110

メッセージを送る時はProducer#sendを使いますし
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L265-L279

受け取る時はConsumer#pollです。
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L446

これらに対して、Subscriberなどの形でReactorのAPIに統合しています。
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L341
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L420

コミットについては、最終的にConsumer#commitAsyncに行き着きます。
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L707-L713
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L740-L745
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L504

また、バッチサイズでのコミットを行うにあたりのackですが、手動で行う場合はここで進んだオフセットと設定したバッチサイズの比較が行われ、
コミットするかどうかを判断します(CommittableOffset#acknowledge)。
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L715-L721

つまり、ackを使わないとバッチサイズだけを設定しても意味がないと…。AutoAckを使用する場合についても、CommittableOffset#acknowledgeが使われます。
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L157

TransactionManagerについても、Producerのトランザクション系のAPIが呼び出されます。
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L469-L513

なお、transactional.idを設定して、トランザクションを有効化すると自動でProducer#initTransactionsが実行されます。
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L89-L92

KafkaSender#sendTransactionallyについても、やっぱり内部的にはProducerのトランザクション系のAPIに行き着きますね。
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L124-L133
https://github.com/reactor/reactor-kafka/blob/v1.0.0.RELEASE/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L188-L199

とりあえず、気になるところはこのあたりでしょうか。

まとめ

Reactor Kafkaを、ちょっと使って遊んでみました。

Apache Kafkaのクライアントライブラリの上に構築されているので、そちらの方の使い方や用語も理解しておいた方が助けになりそうですね。ちょっとずつ
理解を深めていきましょう。