CLOVER🍀

That was when it all began.

Spring Cloud Stream(+Apache Kafka)でDead Letter Queueを試す

Spring Cloud Streamでは、Binderのエラーハンドリングの仕組みとして、Dead Letter Queueというものがあります。

Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3 / Error Handling

RabbitMQにDead Letter Exchanges(DLX)というもので備わっているようですが、
RabbitMQ - Dead Letter Exchanges

Spring Cloud StreamではApache Kafkaでも近いことができるようです。

今回は、Apache Kafkaを使ってやってみます。

参照するドキュメントは、こちら。

Apache KafkaのConsumerとしての設定。
Apache Kafka Binder / Configuration Options / Kafka Consumer Properties

Dead Letter Queue(Apache Kafkaの場合、Topicですが…)の処理方法の例。
Apache Kafka Binder / Dead-Letter Topic Processing

また、Apache KafkaかRabbitMQかによらずリトライ回数の指定も考慮します。
Configuration Options / Binding Properties / Consumer properties

RabbitMQの場合は、もっといろいろ設定できそうですが。
RabbitMQ Binder / Configuration Options / RabbitMQ Consumer Properties
RabbitMQ Binder / Dead-Letter Queue Processing

では、やってみましょう。

お題

以下のような、SourceとSink×2(ProducerとConsumer×2)を作成して試してみます。

  • Source(Producer) … Spring MVCのRestControllerでデータを受け取り、Brokerに登録
  • Sink - 1(Consumer 1) … データを受け取り、標準出力に内容を書き出す。メッセージの内容によっては、処理が失敗したものとして例外を投げる
  • Sink - 2(Consumer 2) … ドキュメントの「Dead-Letter Topic Processing」に従い、DLQ(Dead Letter Queue / Topic)に入ったメッセージを受け取り、リトライ&規定回数を越えたら別のTopicへ転送

Message Brokerには、Apache Kafkaを使用します。

準備

今回のアプリケーションは、Mavenのマルチプロジェクトで作成するものとします。

親プロジェクトは、こんな内容を書いておきます。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Chelsea.SR2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>1.5.6.RELEASE</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Spring Cloud StreamのBOMを使うということと、Spring Boot Pluginの設定だけですね。その他のプロジェクトは、このサブプロジェクトとします。

各サブプロジェクトのpomは、親の設定と以下の依存関係が入っているだけなので、省略します。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
    </dependencies>

あと、Apache Kafkaは起動済みという前提で。

Source(Producer)

ソースコードは、こんな感じ。
api-source/src/main/java/org/littlewings/spring/cloud/ApiSourceApp.java

package av;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class ApiSourceApp {
    public static void main(String... args) {
        SpringApplication.run(ApiSourceApp.class, args);
    }

    Source source;

    public ApiSourceApp(Source source) {
        this.source = source;
    }

    @PostMapping("register")
    @ResponseStatus(HttpStatus.CREATED)
    public Map<String, Object> register(@RequestBody Map<String, Object> message) {
        source.output().send(MessageBuilder.withPayload(message).build());
        return message;
    }
}

受け取ったデータをBrokerに送ったら、おしまいです。

設定上も、特に面白いところはありません。Topicの名前は、「message-topic」としました。
api-source/src/main/resources/application.properties

spring.cloud.stream.bindings.output.destination=message-topic
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

Sink - 1(Consumer - 1)

続いて、Sink。このSinkでは、送られてきたメッセージ内の、「routeDlq」という項目が「true」であれば例外をスローさせるものとします。

ソースコードは、こんな感じ。
switch-sink/src/main/java/org/littlewings/spring/cloud/SwitchSink.java

package org.littlewings.spring.cloud;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class SwitchSink {
    public static void main(String... args) {
        SpringApplication.run(SwitchSink.class, args);

    }

    @StreamListener(Sink.INPUT)
    public void receive(Map<String, Object> message) {
        if (Boolean.valueOf(message.get("routeDlq").toString())) {
            System.out.printf("to dead-letter-queue/topic = %s%n", message);
            throw new RuntimeException("Oops!!");
        } else {
            System.out.printf("received message = %s%n", message);
        }
    }
}

「routeDlq」という項目が「true」でなければ、そのまま標準出力に出力します。

設定ファイルは、こんな感じにしました。
switch-sink/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=message-topic
spring.cloud.stream.bindings.input.group=message-group

spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
# spring.cloud.stream.kafka.bindings.input.consumer.dlqName=message-dlq

# spring.cloud.stream.bindings.input.consumer.maxAttempts=4

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

server.port=18080

「spring.cloud.stream.kafka.bindings.input.consumer.enableDlq」というパラメータを「true」にすることで、Dead Letter Queue(Topic)が
有効になり、Sink(Consumer)でのエラー時に、規定回数リトライ後にDLQにメッセージが転送されるようになります。

ここで送られるDLQの名前は、デフォルトでは

error.<destination>.<group>

となります。

今回の設定では、

error.message-topic.message-group

となり、このTopic名を使用することにします。

また、DLQ名を明示したければ「spring.cloud.stream.kafka.bindings.input.consumer.dlqName」というパラメーターで指定します。今回は、コメントアウト
していますが。

ここまでは、Apache Kafka Consumer側の設定として行います。
Apache Kafka Binder / Configuration Options / Kafka Consumer Properties

リトライ回数については、デフォルトでは3回です。これを変更する場合は、「spring.cloud.stream.bindings.input.consumer.maxAttempts」という
パラメーターで指定します。1にすると、リトライしなくなります。

これは、Apache Kafka、RabbitMQを問わず、Spring Cloud StreamのConsumerとしての設定です。
Configuration Options / Binding Properties / Consumer properties

あと、Sourceとポートがかぶらないように、別ポートにしています。

Sink - 2(Consumer - 2)

最後に、Dead Letter Queue(Topic)に入ったメッセージ(Sink - 1が処理できなかったメッセージ)を処理するコードを書いてみましょう。

Dead Letter Queue(Topic)に入ったメッセージを処理する、ドキュメントおよびサンプルは、こちら。
Apache Kafka Binder / Dead-Letter Topic Processing

このドキュメントで言っていることを、ざっと。

Spring Cloud Streamでは、Dead Letterとなったメッセージをどのように処理するか、標準的な仕組みは提供していません。

Dead Letterとなった理由が一時的なものであれば、元のTopicに戻すことができるかもしれませんし、そうでなければ無限ループ(元のTopic → Dead Letter)の
原因となる可能性もあります。ドキュメントのサンプルでは、Dead Letter Queueから受け取ったメッセージを元のTopicに戻し、同じメッセージを
3回試行してダメだったら、「parking lot」Topicに移します(駐車場、の意)。

ドキュメントでは、以下の事項を注意点として挙げています。

  • メインのアプリケーション(今回で言えばSink 1)が実行されていない時に、再ルーティングの処理を行うこと。でなければ、元のTopicに戻ったメッセージがすぐにメインのアプリケーションに処理され、またエラーになるかもしれません
  • または、このアプリケーションを使用して3つ目のTopicにルーティングし、別のTopicから元のTopicに戻すというアプローチもあります
  • この手法では、再試行を追跡するためにメッセージヘッダーを使用するため、「headerMode = raw」では機能しません。rawにする場合は、Payloadになんらかのデータを追加することを検討してください
  • 「x-retries」ヘッダーを追加して、メインのアプリケーションとDead Letterを処理するアプリケーションの間でヘッダーが転送されるようにします
  • Apache KafkaではPublis/Subscribeするので、最初に正常に処理したものであっても、Dead Letterから再投入され再生されたメッセージは、各Consumer Groupに送信されます

で、今回はメイン側のアプリケーションは止めません。なので、すぐさま再ルーティングされ、Dead Letter行きになります。さらに言うと、リトライ試行後に
諦めた場合の転送先(ParkingLot)行きになります。

ヘッダーの追加は、そのまま再現します。

という感じで、作成したコードはこちら。
routing-sink/src/main/java/org/littlewings/spring/cloud/RoutingSinkApp.java

package org.littlewings.spring.cloud;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootApplication
@EnableBinding({Sink.class, RoutingSinkApp.DeadLetterTopicSource.class})
public class RoutingSinkApp {
    MessageChannel parkingLot;

    public RoutingSinkApp(@Output("parking-lot") MessageChannel parkingLot) {
        this.parkingLot = parkingLot;
    }

    public static void main(String... args) {
        SpringApplication.run(RoutingSinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    @SendTo(Source.OUTPUT)
    public Message<?> routing(Message<Map<String, Object>> message) throws InterruptedException {
        System.out.println("=== headers");
        message.getHeaders().entrySet().forEach(e -> System.out.printf("  %s%n", e));

        if (Boolean.valueOf(message.getPayload().get("routeDlq").toString())) {
            Integer retries = message.getHeaders().get("x-retries", Integer.class);
            if (retries == null) {
                System.out.printf("first retry: %s%n", message.getPayload());

                System.out.println("sleep...");
                TimeUnit.SECONDS.sleep(3L);
                System.out.println();

                message.getPayload().put("retryCount", 1);

                return MessageBuilder
                        .fromMessage(message)
                        .setHeader("x-retries", 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            } else if (retries.intValue() < 3) {
                System.out.printf("retrying... %d, %s%n", retries, message.getPayload());

                System.out.println("sleep...");
                TimeUnit.SECONDS.sleep(3L);
                System.out.println();

                message.getPayload().put("retryCount", retries + 1);

                return MessageBuilder
                        .fromMessage(message)
                        .setHeader("x-retries", retries + 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            } else {
                System.out.printf("go paking-lot = %s%n", message.getPayload());
                System.out.println();

                message.getPayload().put("goPakingLot", "true");
                parkingLot
                        .send(MessageBuilder.fromMessage(message)
                                .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                        message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                                .build());
                return null;
            }
        } else {
            System.out.printf("received message = %s%n", message.getPayload());
            System.out.println();
            return null;
        }
    }

    public interface DeadLetterTopicSource extends Source {
        @Output("parking-lot")
        MessageChannel parkingLot();
    }
}

ちょっと名前とか変えていますが、概ね似たような感じです。

なんとなく、ヘッダーの内容を出力するようにしてみました。

        System.out.println("=== headers");
        message.getHeaders().entrySet().forEach(e -> System.out.printf("  %s%n", e));

「routeDlq」が「true」であれば、リトライを行い、再度元のTopicに送信します。

        if (Boolean.valueOf(message.getPayload().get("routeDlq").toString())) {

初回は「x-retries」ヘッダーがないので、1にしてヘッダーを追加。2回目以降は、「x-retries」ヘッダーの値が3になるまでリトライを行います。

            Integer retries = message.getHeaders().get("x-retries", Integer.class);
            if (retries == null) {
                System.out.printf("first retry: %s%n", message.getPayload());

                System.out.println("sleep...");
                TimeUnit.SECONDS.sleep(3L);
                System.out.println();

                message.getPayload().put("retryCount", 1);

                return MessageBuilder
                        .fromMessage(message)
                        .setHeader("x-retries", 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            } else if (retries.intValue() < 3) {
                System.out.printf("retrying... %d, %s%n", retries, message.getPayload());

                System.out.println("sleep...");
                TimeUnit.SECONDS.sleep(3L);
                System.out.println();

                message.getPayload().put("retryCount", retries + 1);

                return MessageBuilder
                        .fromMessage(message)
                        .setHeader("x-retries", retries + 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();

あんまりすぐに元のTopicにルーティングすると、ちょっと速すぎたので軽くスリープさせておきました。

送信用のメッセージを組む再には、「x-retires」ヘッダの値をインクリメントするのと、同じパーティションに送られる用に調整。こちらは、
ドキュメントと同様です。

                return MessageBuilder
                        .fromMessage(message)
                        .setHeader("x-retries", retries + 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();

今回は、ヘッダー以外にもPayload自体にリトライ試行回数を含めるようにしました。

                message.getPayload().put("retryCount", 1);

                message.getPayload().put("retryCount", retries + 1);

見た目的にもわかりやすいでしょう。

3回のリトライを越えると、リトライ試行後に諦め、別のTopic行きにします。

            } else {
                System.out.printf("go paking-lot = %s%n", message.getPayload());
                System.out.println();

                message.getPayload().put("goPakingLot", "true");
                parkingLot
                        .send(MessageBuilder.fromMessage(message)
                                .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                        message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                                .build());
                return null;
            }

この時は、「goPakingLot」という項目を追加しておきました。

条件分岐の末端についているのは、「routeDlq」が未指定または「false」だったら単に標準出力へ書き出すコードですが、今回はここには
到達しませんね。

        } else {
            System.out.printf("received message = %s%n", message.getPayload());
            System.out.println();
            return null;
        }

ソースコードの最後に定義したるのは、リトライ試行後に諦めた場合の転送先用のTopicに送るためのインターフェースです。

    public interface DeadLetterTopicSource extends Source {
        @Output("parking-lot")
        MessageChannel parkingLot();
    }

アプリケーションの設定は、こちら。
routing-sink/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=error.message-topic.message-group
spring.cloud.stream.bindings.input.group=message-relay-group

spring.cloud.stream.bindings.output.destination=message-topic
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.bindings.parking-lot.destination=parking-lot
spring.cloud.stream.bindings.parking-lot.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

spring.cloud.stream.kafka.binder.headers=x-retries

server.port=28080

とりあえず、リッスンポートはずらしました。

入力となるTopicは、Dead Letter Queueである「error.message-topic.message-group」となります。

書き出し先となるTopicは2つあり、それぞれ元のTopicとリトライ試行後に諦めた場合の転送先、ParkingLotです。

あとは、「spring.cloud.stream.kafka.binder.headers」でカスタムヘッダー「x-retries」を指定しています。これを入れていないと、
ヘッダーを付与しても無視されてしまいます。

動作確認

それでは、パッケージングして

$ mvn package

動かしてみましょう。

## Source(Producer)
$ java -jar api-source/target/api-source-0.0.1-SNAPSHOT.jar

## Sink - 1(Consumer - 1)
$ java -jar switch-sink/target/switch-sink-0.0.1-SNAPSHOT.jar

## Sink - 2(Consumer - 2)
$ java -jar routing-sink/target/routing-sink-0.0.1-SNAPSHOT.jar

ここで、データを送るのに簡単なスクリプトを用意。
post.sh

#!/bin/bash

REJECT=$1

if [ "${REJECT}" == "" ]; then
  REJECT=false
fi

NOW=`date '+%Y-%m-%d %H:%M:%S'`

curl -i -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d '{"message": "Hello World", "routeDlq": "'"$REJECT"'", "time": "'"$NOW"'"}'

わかりやすいように、実行時の時間をメッセージに入れるようにしました。
あと、引数をtrue/falseにすることとで、「routeDlq」の値を変えることができます。

では、確認。

とりあえず、ふつうに処理できるメッセージを送ります。

$ ./post.sh false
HTTP/1.1 201 
X-Application-Context: application
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Sat, 05 Aug 2017 08:30:58 GMT

{"message":"Hello World","routeDlq":"false","time":"2017-08-05 17:30:58"}

Producerは置いておいて…Sink - 1側ではこういう出力が行われます。

received message = {message=Hello World, routeDlq=false, time=2017-08-05 17:30:58}

続いて、Dead Letter Queue行きのメッセージを送ります。

$ ./post.sh true
HTTP/1.1 201 
X-Application-Context: application
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Sat, 05 Aug 2017 08:31:55 GMT

{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55"}

すると、Sink - 1、Sink - 2でこういう挙動になります。

## Sink - 1
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55}
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55}
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55}


## Sink - 2
=== headers
  kafka_offset=0
  id=165242f7-65d7-de22-5ec4-b0d7babb6733
  kafka_receivedPartitionId=0
  contentType=application/json;charset=UTF-8
  kafka_receivedTopic=error.message-topic.message-group
  timestamp=1501921918879
first retry: {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55}
sleep...


## Sink - 1
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=1}
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=1}
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=1}


## Sink - 2
=== headers
  x-retries=1
  kafka_offset=1
  id=71fb5fcd-9371-2be0-37dc-d8269fed12e8
  kafka_receivedPartitionId=0
  contentType=application/json;charset=UTF-8
  kafka_receivedTopic=error.message-topic.message-group
  timestamp=1501921925008
retrying... 1, {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=1}
sleep...


## Sink - 1
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=2}
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=2}
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=2}


## Sink - 2
=== headers
  x-retries=2
  kafka_offset=2
  id=2bf4da61-8679-6fa9-8f6d-32bc81ab55a3
  kafka_receivedPartitionId=0
  contentType=application/json;charset=UTF-8
  kafka_receivedTopic=error.message-topic.message-group
  timestamp=1501921931052
retrying... 2, {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=2}
sleep...


## Sink - 1
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=3}
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=3}
to dead-letter-queue/topic = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=3}


## Sink - 2
=== headers
  x-retries=3
  kafka_offset=3
  id=00e40c5c-eb15-fe63-ae29-da36421e9904
  kafka_receivedPartitionId=0
  contentType=application/json;charset=UTF-8
  kafka_receivedTopic=error.message-topic.message-group
  timestamp=1501921937086
go paking-lot = {message=Hello World, routeDlq=true, time=2017-08-05 17:31:55, retryCount=3}

Sink - 1側では例外を投げますが、規定回数リトライを行います。ここで規定回数を越えるとDead Letter行きとなります。

Dead Letterを受け取ったSink - 2側では、「x-retries」ヘッダが3に達しない限り、元のTopicにメッセージを送り返します。この時に、リトライした回数
(「retryCount」)をメッセージに含めます。

すると、Sink - 1側で戻ってきたメッセージを読み出して…再度Dead Letterに行きます。

これを3回繰り返すと、諦めて別のTopicに転送する、という感じですね。

あともう1回ずつ、メッセージを投げておきます。

$ ./post.sh false
$ ./post.sh true

Apache Kafkaのコマンドラインツールで、Topicの内容を見てみましょう。

まずは、メインのTopic。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092 --topic message-topic --from-beginning
&#65533;^B
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"{"message":"Hello World","routeDlq":"false","time":"2017-08-05 17:30:58"}
&#65533;^B
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55"}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A1{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55","retryCount":1}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A2{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55","retryCount":2}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A3{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55","retryCount":3}
&#65533;^B
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"{"message":"Hello World","routeDlq":"false","time":"2017-08-05 17:36:29"}
&#65533;^B
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:36:33"}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A1{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:36:33","retryCount":1}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A2{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:36:33","retryCount":2}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A3{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:36:33","retryCount":3}
Processed a total of 10 messages

Dead Letter Queueに行かないメッセージを2つ、Dead Letter Queueへ行くメッセージを2つそれぞれ投げているので、Dead Letter Queueに行った方は
リトライが3回分入り、6回増えます。

というわけで、全部で10メッセージですね。retryCountを見ると、「x-retries」ヘッダーの利用による試行の回数に合わせて増えていっているのがわかります。

Dead Letter Queueの方は、8メッセージ。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092 --topic error.message-topic.message-group --from-beginning
&#65533;^B
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55"}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A1{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55","retryCount":1}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A2{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55","retryCount":2}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A3{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55","retryCount":3}
&#65533;^B
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:36:33"}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A1{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:36:33","retryCount":1}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A2{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:36:33","retryCount":2}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A3{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:36:33","retryCount":3}
Processed a total of 8 messages

Dead Letter Queueから、元のTopicへの再送を諦めた場合の転送先Topicは、2つのメッセージになります。このメッセージには、「goPakingLot」という
項目が増えています。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092 --topic parking-lot --from-beginning
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A3{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:31:55","retryCount":3,"goPakingLot":"true"}
&#65533;^C
  contentType
             "text/plain"^SoriginalContentType "application/json;charset=UTF-8"	x-retries^A3{"message":"Hello World","routeDlq":"true","time":"2017-08-05 17:36:33","retryCount":3,"goPakingLot":"true"}
Processed a total of 2 messages

とりあえず、動きは掴めた感じです。

もうちょっと内容を

どういう理屈で動いているのか、軽く見てみましょう。

Spring Cloud Streamの本体で、リトライの設定を扱うRetryTemplateを構築します。
https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java#L285

この中で、先ほどのアプリケーションで設定…しませんでしたが、maxAttemptsが使用されることになります。

で、このRetryTemplateをKafka Binder側で利用して
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/v1.2.1.RELEASE/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L274

MessageListenerのErrorHandlerとして、Dead Letter Queueへの送信処理を実装している、という感じです。
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/v1.2.1.RELEASE/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L279-L313

バージョン変わると、また雰囲気変わってそうですが、現時点でのGAだとこうです。

まとめ

Spring Cloud Streamで、Apache Kafkaを使った場合のDead Letter Queueを試してみました。

けっこう気になっていた機能だったのですが、挙動と設定をある程度把握するのにちょっと時間がかかってしまいました…。まあ、とっかかりとしては
把握できたので、良しとしましょう。