CLOVER🍀

That was when it all began.

Apache KafkaでConsumerのCommitとProducerのTransactionを試す

Apache KafkaのJavaクライアントを使ったプログラムを書く時、いつもAuto Commitでやっていたので、ちょっと他のパターンも
試してみようかと思いまして。

見てみると、ConsumerでCommit、ProducerではTransactionが使えるようです。

参考)
Design / Message Delivery Semantics

Kafka Simple Consumer Failure Recovery

Transactions in Apache Kafka

Apache Kafka Best Practices

Kafkaコンシューマのリバランスはいつ行われるか - 見習いプログラミング日記

今回、ちょっとこのあたりを試していこうと思います。

準備

Maven依存関係は、こちら。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>

あとは、テストライブラリを使いまして。

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.0.3</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.9.0</version>
        </dependency>

Apache ZooKeeperおよびApache KafkaのBrokerは、起動済みとします。

この前提の環境に対して、

  • Auto Commit
  • ConsumerのCommit
  • ProducerのTransaction

をそれぞれ使って試してみようと思います。

各パターンごとに、個々にTopicを用意するものとします。

テストコードの雛形

用意したテストコードの雛形は、こちら。
src/test/java/org/littlewings/kafka/transactional/TransactionalTest.java

package org.littlewings.kafka.transactional;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class TransactionalTest {
    // ここに、テストを書く!!
}

Auto Commit

最初は、なんの変哲もないAuto Commitから。

    @Test
    public void autoCommit() throws ExecutionException, InterruptedException {
        String brokerConnectString = "172.25.0.3:9092";

        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.RETRIES_CONFIG, 1);  // default "0"
        producerProperties.put(ProducerConfig.ACKS_CONFIG, "1");  // default

        String topicName = "auto-commit-topic";

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            List<Future<RecordMetadata>> futures = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures) {
                f.get();
            }
        }

        String consumerGroup = "auto-commit-topic-group";

        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // default "latest"
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);  // default


        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));

            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);

            assertThat(consumerRecords).hasSize(10);
        }

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);

            assertThat(consumerRecords).isEmpty();
        }
    }

Producerの設定、使用するTopic名はこんな感じ。「retries」のみ、デフォルト値から変えています。

        String brokerConnectString = "172.25.0.3:9092";

        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.RETRIES_CONFIG, 1);  // default "0"
        producerProperties.put(ProducerConfig.ACKS_CONFIG, "1");  // default

        String topicName = "auto-commit-topic";

Consumerの設定は、こんな感じです。配信されたメッセージは、Topicの最初から読むようにします。

        String consumerGroup = "auto-commit-topic-group";

        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // default "latest"
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);  // default

メッセージをProducerから登録して

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            List<Future<RecordMetadata>> futures = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures) {
                f.get();
            }
        }

Consumerから取り出せますし、読み切ったらとりあえずそこから先は(メッセージを追加していないので)空です。

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));

            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);

            assertThat(consumerRecords).hasSize(10);
        }

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);

            assertThat(consumerRecords).isEmpty();
        }

ConsumerのCommitを使う

それでは、次にConsumer側のCommitを使ってみましょう。

    @Test
    public void commitSync() throws ExecutionException, InterruptedException {
        String brokerConnectString = "172.25.0.3:9092";

        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.RETRIES_CONFIG, 1);  // default "0"
        producerProperties.put(ProducerConfig.ACKS_CONFIG, "1");  // default

        String topicName = "use-commit-topic";

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            List<Future<RecordMetadata>> futures = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures) {
                f.get();
            }
        }

        String consumerGroup = "use-commit-topic-group";

        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // default "latest"
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // default "true"

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));

            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);

            assertThat(consumerRecords).hasSize(10);
            assertThat(consumerRecords.get(0).key()).isEqualTo("use-commit-topic-key-1");
            assertThat(consumerRecords.get(1).key()).isEqualTo("use-commit-topic-key-2");
            assertThat(consumerRecords.get(2).key()).isEqualTo("use-commit-topic-key-3");
        }

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);
            consumer.commitSync();

            assertThat(consumerRecords).hasSize(10);
            assertThat(consumerRecords.get(0).key()).isEqualTo("use-commit-topic-key-1");
            assertThat(consumerRecords.get(1).key()).isEqualTo("use-commit-topic-key-2");
            assertThat(consumerRecords.get(2).key()).isEqualTo("use-commit-topic-key-3");
        }

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);
            consumer.commitSync();

            assertThat(consumerRecords).isEmpty();
        }
    }

Producer側は、Topicの名前が変わったくらいなので割愛。

Consumer側は、まず設定で「enable.auto.commit」を「false」に設定します。

        String consumerGroup = "use-commit-topic-group";

        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // default "latest"
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // default "true"

この状態で、Consumerでメッセージを取り出してからcloseすると

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));

            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);

            assertThat(consumerRecords).hasSize(10);
            assertThat(consumerRecords.get(0).key()).isEqualTo("use-commit-topic-key-1");
            assertThat(consumerRecords.get(1).key()).isEqualTo("use-commit-topic-key-2");
            assertThat(consumerRecords.get(2).key()).isEqualTo("use-commit-topic-key-3");
        }

再び、メッセージを読み出すことができます。ここで、KafkaConsumer#commitSyncを呼び出しておくと

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);
            consumer.commitSync();

            assertThat(consumerRecords).hasSize(10);
            assertThat(consumerRecords.get(0).key()).isEqualTo("use-commit-topic-key-1");
            assertThat(consumerRecords.get(1).key()).isEqualTo("use-commit-topic-key-2");
            assertThat(consumerRecords.get(2).key()).isEqualTo("use-commit-topic-key-3");
        }

次は、取得するメッセージがなくなります、と。

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);
            consumer.commitSync();

            assertThat(consumerRecords).isEmpty();
        }

というわけで、Auto Commitを無効にした場合は、KafkaConsumer#commitSync(もしくはcommitAsync)を使用することで、メッセージを読み出したという
状態をCommitすることができます。

https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync--
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-java.util.Map-
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync--
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync-java.util.Map-org.apache.kafka.clients.consumer.OffsetCommitCallback-

CommitせずにKafkaConsumerをcloseしてしまうと、読み出したことがCommitされずにまた同じメッセージを受け取ることができる、という挙動になりますよ、と。

まあ、Commitするということは、Consumerが指しているポジションを進めて確定することを指すんでしょうね。

The committed position is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).

https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

ProducerのTransactionを使う

最後に、ProducerのTransactionを使ってみます。こちらでは、少しバリエーションを増やす感じで確認してみようかと。

パターンとして、「Read Committed」、「Read Uncommitted」の挙動の違いで大きく分けたうえで、掘り下げていきたいと思います。

ここで注意ですが、Transactionの制御を行うのはProducerとなり、「Read Committed」や「Read Uncommitted」の設定を行ってその挙動になるのはConsumer側と
なります。

Read Committed

Producer側の設定は、こんな感じ。

        String brokerConnectString = "172.25.0.3:9092";

        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.RETRIES_CONFIG, 1);  // default "1"
        producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");  // default "1"
        producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);  // default "5"
        producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // default "false"
        producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");  // default "null"

        String topicName = "transactional-topic";

Transactionを使う場合のポイントは、このあたりです。

        producerProperties.put(ProducerConfig.RETRIES_CONFIG, 1);  // default "1"
        producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");  // default "1"
        producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);  // default "5"
        producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // default "false"
        producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");  // default "null"

Transactionalな配信に使用する、「transactional.id」を指定することがまずは必要です。これを見ると「enable.idempotence」(冪等性)を「true」にする
必要があると書いてあります。

The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is empty, which means transactions cannot be used.

https://kafka.apache.org/10/documentation.html#producerconfigs

で、さらに「enable.idempotence」の説明を見ると、Producerはメッセージを1回だけ正確に書き込むことを保証すると書いています。そのためには、
「max.in.flight.requests.per.connection」が5以下であり、「retries」が0より大きく、「acks」が「all」でなければならない、と。

When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0 and acks must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown.

つまり、こうなるわけです。ドキュメントを見ると、明示的に設定していない場合は自動調整してくれるそうですけどね。

        producerProperties.put(ProducerConfig.RETRIES_CONFIG, 1);  // default "1"
        producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");  // default "1"
        producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);  // default "5"
        producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // default "false"
        producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");  // default "null"

Consumer側の設定は、「isolation.level」を「read_committed」にしてみます(デフォルトは「read_uncommitted」)。

        String consumerGroup = "transactional-topic-group";

        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // default "latest"
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // default "true"
        consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");  // default "read_uncommitted"

Transactionを使うには、最初にKafkaProducer#initTransactionを呼び出します。

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            producer.initTransactions();

            // あとで
        }

最初はabortしてみましょう。KafkaProducer#beginTransactionして、メッセージを入れてからKafkaProducer#abortTransactionを呼び出してみます。

            // abort
            producer.beginTransaction();

            List<Future<RecordMetadata>> futures1 = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures1.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures1) {
                f.get();
            }

            producer.abortTransaction();

すると、Consumer側からは送信したメッセージを読み出せません。

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList(topicName));
                List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

                ConsumerRecords<String, String> records = consumer.poll(3000L);
                records.forEach(consumerRecords::add);
                consumer.commitSync();

                assertThat(consumerRecords).isEmpty();
            }

KafkaProducer#abortTransactionしていた部分を、KafkaProducer#commitTransactionにすると、Consumer側でメッセージを読み出すことができるようになります。

            // commit
            producer.beginTransaction();

            List<Future<RecordMetadata>> futures2 = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures2.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures2) {
                f.get();
            }

            producer.commitTransaction();

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList(topicName));
                List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

                ConsumerRecords<String, String> records = consumer.poll(3000L);
                records.forEach(consumerRecords::add);
                consumer.commitSync();

                assertThat(consumerRecords).hasSize(10);
                assertThat(consumerRecords.get(0).key()).isEqualTo("transactional-topic-key-1");
                assertThat(consumerRecords.get(1).key()).isEqualTo("transactional-topic-key-2");
                assertThat(consumerRecords.get(2).key()).isEqualTo("transactional-topic-key-3");
            }

なお、Transactionを有効にした状態でKafkaProducer#initTransactionや、KafkaProducer#beginTransactionを呼び出さずにKafkaProducer#sendを
呼び出した場合は、例外がスローされます。

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            assertThatThrownBy(() -> {
                List<Future<RecordMetadata>> futures = new ArrayList<>();
                IntStream.rangeClosed(1, 10).forEach(i -> {
                    String key = String.format("%s-key-%d", topicName, i);
                    String value = String.format("%s-value-%d", topicName, i);
                    futures.add(producer.send(new ProducerRecord<>(topicName, key, value)));
                });
            })
                    .isInstanceOf(IllegalStateException.class)
                    .hasMessage("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");
        }

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            producer.initTransactions();

            assertThatThrownBy(() -> {
                List<Future<RecordMetadata>> futures = new ArrayList<>();
                IntStream.rangeClosed(1, 10).forEach(i -> {
                    String key = String.format("%s-key-%d", topicName, i);
                    String value = String.format("%s-value-%d", topicName, i);
                    futures.add(producer.send(new ProducerRecord<>(topicName, key, value)));
                });
            })
                    .isInstanceOf(IllegalStateException.class)
                    .hasMessage("Cannot call send in state READY");
        }

もちろん、TransactionをAbortしたり、Commitした後にKafkaProducer#closeを行い、Consumerから読み出しても挙動は変わりません。

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            producer.initTransactions();

            // abort
            producer.beginTransaction();

            List<Future<RecordMetadata>> futures = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures) {
                f.get();
            }

            producer.abortTransaction();
        }

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);
            consumer.commitSync();

            assertThat(consumerRecords).isEmpty();
        }

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            producer.initTransactions();

            // commit
            producer.beginTransaction();

            List<Future<RecordMetadata>> futures = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures) {
                f.get();
            }

            producer.commitTransaction();
        }

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);
            consumer.commitSync();

            assertThat(consumerRecords).hasSize(10);
            assertThat(consumerRecords.get(0).key()).isEqualTo("transactional-topic-key-1");
            assertThat(consumerRecords.get(1).key()).isEqualTo("transactional-topic-key-2");
            assertThat(consumerRecords.get(2).key()).isEqualTo("transactional-topic-key-3");
        }

あと、「Read Committed」の観点で確認してみましょう。KafkaProducer#beginTransactionして、メッセージを送ってみます。

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            producer.initTransactions();

            producer.beginTransaction();

            List<Future<RecordMetadata>> futures1 = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures1.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures1) {
                f.get();
            }

            // あとで

ここで、CommitしないままConsumerからメッセージを読み出そうとしてみます。が、読み出せません。

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList(topicName));
                List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

                ConsumerRecords<String, String> records = consumer.poll(3000L);
                records.forEach(consumerRecords::add);
                consumer.commitSync();

                assertThat(consumerRecords).isEmpty();
            }

Commitしてみます。

            producer.commitTransaction();

すると、今度は読み出せるようになります。

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList(topicName));
                List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

                ConsumerRecords<String, String> records = consumer.poll(3000L);
                records.forEach(consumerRecords::add);
                consumer.commitSync();

                assertThat(consumerRecords).hasSize(10);
                assertThat(consumerRecords.get(0).key()).isEqualTo("transactional-topic-key-1");
                assertThat(consumerRecords.get(1).key()).isEqualTo("transactional-topic-key-2");
                assertThat(consumerRecords.get(2).key()).isEqualTo("transactional-topic-key-3");
            }
        }

確かに、「Read Committed」ですね。

Read Uncommitted

一応、「Read Uncommitted」も試してみます。なお、Consumer側のデフォルト値は、繰り返しますが「read_uncommitted」です。

Producer側およびConsumer側の設定。「isolation.level」が「read_uncommitted」となっていること以外は、Read Committedの時と同じです(Topic名や
Consumer Group名は違いますが)。

        String brokerConnectString = "172.25.0.3:9092";

        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.RETRIES_CONFIG, 1);  // default "1"
        producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");  // default "1"
        producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);  // default "5"
        producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // default "false"
        producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");  // default "null"

        String topicName = "uncommitted-transactional-topic";

        String consumerGroup = "uncommitted-transactional-topic-group";

        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectString);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // default "latest"
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // default "true"
        consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");  // default

こうすると、なんとTransactionをAbortしてもConsumerからメッセージを読み出すことができるのですが…。

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            producer.initTransactions();

            // abort
            producer.beginTransaction();

            List<Future<RecordMetadata>> futures1 = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures1.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures1) {
                f.get();
            }

            producer.abortTransaction();

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList(topicName));
                List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

                ConsumerRecords<String, String> records = consumer.poll(3000L);
                records.forEach(consumerRecords::add);
                consumer.commitSync();

                assertThat(consumerRecords).hasSize(10);
                assertThat(consumerRecords.get(0).key()).isEqualTo("uncommitted-transactional-topic-key-1");
                assertThat(consumerRecords.get(1).key()).isEqualTo("uncommitted-transactional-topic-key-2");
                assertThat(consumerRecords.get(2).key()).isEqualTo("uncommitted-transactional-topic-key-3");
            }

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList(topicName));
                List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

                ConsumerRecords<String, String> records = consumer.poll(3000L);
                records.forEach(consumerRecords::add);
                consumer.commitSync();

                assertThat(consumerRecords).isEmpty();
            }


            // commit
            producer.beginTransaction();

            List<Future<RecordMetadata>> futures2 = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures2.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures2) {
                f.get();
            }

            producer.commitTransaction();

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList(topicName));
                List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

                ConsumerRecords<String, String> records = consumer.poll(3000L);
                records.forEach(consumerRecords::add);
                consumer.commitSync();

                assertThat(consumerRecords).hasSize(10);
                assertThat(consumerRecords.get(0).key()).isEqualTo("uncommitted-transactional-topic-key-1");
                assertThat(consumerRecords.get(1).key()).isEqualTo("uncommitted-transactional-topic-key-2");
                assertThat(consumerRecords.get(2).key()).isEqualTo("uncommitted-transactional-topic-key-3");
            }
        }

「Read Uncommitted」の文字どおり、Commit前のメッセージも読み出すことができます。

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            producer.initTransactions();

            producer.beginTransaction();

            List<Future<RecordMetadata>> futures1 = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures1.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures1) {
                f.get();
            }

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList(topicName));
                List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

                ConsumerRecords<String, String> records = consumer.poll(3000L);
                records.forEach(consumerRecords::add);
                consumer.commitSync();

                assertThat(consumerRecords).hasSize(10);
                assertThat(consumerRecords.get(0).key()).isEqualTo("uncommitted-transactional-topic-key-1");
                assertThat(consumerRecords.get(1).key()).isEqualTo("uncommitted-transactional-topic-key-2");
                assertThat(consumerRecords.get(2).key()).isEqualTo("uncommitted-transactional-topic-key-3");
            }

            // Commit!!!
            producer.commitTransaction();

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList(topicName));
                List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

                ConsumerRecords<String, String> records = consumer.poll(3000L);
                records.forEach(consumerRecords::add);
                consumer.commitSync();

                assertThat(consumerRecords).isEmpty();
            }
        }

しかも、メッセージは消費したことになっています…。

KafkaProducer#closeしたりしても、特に変わりません。

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            producer.initTransactions();

            // abort
            producer.beginTransaction();

            List<Future<RecordMetadata>> futures = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures) {
                f.get();
            }

            producer.abortTransaction();
        }

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);
            consumer.commitSync();

            assertThat(consumerRecords).hasSize(10);
            assertThat(consumerRecords.get(0).key()).isEqualTo("uncommitted-transactional-topic-key-1");
            assertThat(consumerRecords.get(1).key()).isEqualTo("uncommitted-transactional-topic-key-2");
            assertThat(consumerRecords.get(2).key()).isEqualTo("uncommitted-transactional-topic-key-3");
        }

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
            producer.initTransactions();

            // commit
            producer.beginTransaction();

            List<Future<RecordMetadata>> futures = new ArrayList<>();
            IntStream.rangeClosed(1, 10).forEach(i -> {
                String key = String.format("%s-key-%d", topicName, i);
                String value = String.format("%s-value-%d", topicName, i);
                futures.add(producer.send(new ProducerRecord<>(topicName, key, value)));
            });

            for (Future<RecordMetadata> f : futures) {
                f.get();
            }

            producer.commitTransaction();
        }

        try (KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Arrays.asList(topicName));
            List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();

            ConsumerRecords<String, String> records = consumer.poll(3000L);
            records.forEach(consumerRecords::add);
            consumer.commitSync();

            assertThat(consumerRecords).hasSize(10);
            assertThat(consumerRecords.get(0).key()).isEqualTo("uncommitted-transactional-topic-key-1");
            assertThat(consumerRecords.get(1).key()).isEqualTo("uncommitted-transactional-topic-key-2");
            assertThat(consumerRecords.get(2).key()).isEqualTo("uncommitted-transactional-topic-key-3");
        }

こういう挙動だと、Transactionを使うならRead Committedでって感じでしょうかね。

まとめ

Apache KafkaのJavaクライアントで、ConsumerでのCommit、ProducerでのTransactionを試してみました。

ちょっとAPIの深堀りとそもそもの内容の確認が全然足りないのですが、それはまた少しずつ慣れていく感じで。

まずは雰囲気をつかんでみました、と。