CLOVER🍀

That was when it all began.

RabbitMQのRoutingを試してみる

RabbitMQのチュートリアルで遊んでみるシリーズ、今度はRoutingです。

RabbitMQ - RabbitMQ tutorial - Routing

このひとつ前のチュートリアルでは、Producer側が送ったメッセージがConsumerにブロードキャストするといったものでした。

今回のチュートリアルでは、Consumer側がメッセージのサブセットを受け取れるようにしてみます。このエントリでは、単純なメッセージの振り分けに加えて、元のチュートリアルのログ出力サンプルをマネたものを作ってみたいと思います。

メッセージをブロードキャストしていた時の振り返り

Producer側から送ったメッセージをブロードキャストしていた時のConsumer側のコードは、こんな感じでした。

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, "logs", "");

Channel#queueBindの第3引数が空文字です。

この第3引数は、routingKeyという特別なもので、このチュートリアルではこれをバインディングキーと呼ぶそうです(Channel#queueBindで、Exchangeとキューをバインドして関連付けますが、その時のキーとなっているので)。

バインディングキーの意味は、Exchangeの種類に依存します。このひとつ前のチュートリアルでは、Exchangeの種類は「fanout」となっていましたが、この場合はバインディングキーは無視されるそうです。

Exchangeの種類については、ここを見ればよいみたいですね。

RabbitMQ - AMQP 0-9-1 Model Explained

それでは、進めてみましょう。

準備

まず、RabbitMQは起動済みとします。

そして、クライアントライブラリにJavaを利用するので、Maven依存関係を定義します。こちらです。

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.4.1</version>
            <scope>test</scope>
        </dependency>

テストコード用に、JUnitとAssertJ付きです。

テストコードの雛形

テストコードで動作確認をしますが、import文などの大枠はこんな感じです。
src/test/java/org/littlewings/rabbitmq/routing/RoutingTest.java

package org.littlewings.rabbitmq.routing;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class RoutingTest {
    // ここに、テストを書く!
}

Producer側を書く

ここでは、Producerから送信するメッセージを、特定のExchange、キューにバインドしたConsumerに送るコードを書きます。

前回のチュートリアルでは「fanout」でブロードキャストしたわけですが、今回は「direct」を使用します。

「direct」を使用すると、routingKeyにマッチしたキューにメッセージが送信されることになります。

書いたコードは、こちらです。

    public void publish(String routingKey, String... messages) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("messageExchange", "direct");

        Arrays.asList(messages).forEach(message -> {
            try {
                channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        channel.close();
        connection.close();
    }

routingKeyは、メソッドの引数としてもらうようにしました。

Exchangeを「direct」で定義して

        channel.exchangeDeclare("messageExchange", "direct");

メッセージをroutingKeyを指定して送信します。Channel#basicPublishの第2引数に、routingKeyを指定しているところがポイントです。

        Arrays.asList(messages).forEach(message -> {
            try {
                channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

Consumer側を書く

続いて、Consumer側を書いてみます。

書いたコードは、こちら。

    public List<String> subscribe(String... routingKeys) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("messageExchange", "direct");
        String queueName = channel.queueDeclare().getQueue();

        Arrays.asList(routingKeys).forEach(routingKey -> {
            try {
                channel.queueBind(queueName, "messageExchange", routingKey);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) {
                receivedMessages.add(new String(bytes, StandardCharsets.UTF_8));
            }
        };

        channel.basicConsume(queueName, true, consumer);

        TimeUnit.SECONDS.sleep(5L);

        return receivedMessages;
    }

テストできるように、受信したメッセージはListで戻すようにしています。

ここでのポイントは、ExchangeをProducer側と同じく「direct」で宣言していることと

        channel.exchangeDeclare("messageExchange", "direct");

Channel#queueBindで、routingKeyを第3引数に指定していることですね。なお、あとでも書きますが、Channel#queueBindでは複数のroutingKeyをバインドさせることができます。

        String queueName = channel.queueDeclare().getQueue();

        Arrays.asList(routingKeys).forEach(routingKey -> {
            try {
                channel.queueBind(queueName, "messageExchange", routingKey);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

ひとつのroutingKeyにバインドさせてみる

それでは、動かしてみましょう。まずは、ひとつのrougingKeyを使ってバインドさせてみます。

書いたコードは、こちら。

    @Test
    public void routingOneToOneBinding() throws IOException, TimeoutException, ExecutionException, InterruptedException {
        // Source messages
        List<String> messages = Arrays.asList("Hello World!", "Hello RabbitMQ!!");

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(() -> subscribe("black"));
        Future<List<String>> f2 = es.submit(() -> subscribe("white"));

        // Subscribe側が起動しきるまで、待機
        TimeUnit.SECONDS.sleep(2L);

        // Publish
        publish("black", messages.toArray(new String[messages.size()]));

        assertThat(f1.get())
                .containsExactlyElementsOf(messages);
        assertThat(f2.get())
                .isEmpty();

        es.shutdown();
        es.awaitTermination(5L, TimeUnit.SECONDS);
    }

送信するメッセージは2つとし、それぞれ「black」と「white」というroutingKeyでExchange、キューにバインドします。

        // Source messages
        List<String> messages = Arrays.asList("Hello World!", "Hello RabbitMQ!!");

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(() -> subscribe("black"));
        Future<List<String>> f2 = es.submit(() -> subscribe("white"));

そして、routingKey「black」で送信。

        // Publish
        publish("black", messages.toArray(new String[messages.size()]));

すると、「black」でバインドしたConsumer側はメッセージを受信しますが、「white」でバインドしたConsumer側はメッセージを受信しないという結果になります。

        assertThat(f1.get())
                .containsExactlyElementsOf(messages);
        assertThat(f2.get())
                .isEmpty();

ちゃんと振り分けてくれましたね。

複数のroutingKeyにバインドさせる

最後に、Consumerを複数のroutingKeyにバインドさせてみます。

書いたコードはこちら。

    @Test
    public void routingMultipleBinding() throws IOException, TimeoutException, ExecutionException, InterruptedException {
        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(() -> subscribe("info"));
        Future<List<String>> f2 = es.submit(() -> subscribe("debug", "info", "warn", "error"));

        // Subscribe側が起動しきるまで、待機
        TimeUnit.SECONDS.sleep(2L);

        // Publish
        publish("debug", "debug-message1");
        publish("info", "info-message1", "info-message2");
        publish("warn", "warn-message1");
        publish("error", "error-message1", "error-message2");

        assertThat(f1.get())
                .containsExactlyElementsOf(Arrays.asList("info-message1", "info-message2"));
        assertThat(f2.get())
                .containsExactlyElementsOf(Arrays.asList("debug-message1", "info-message1", "info-message2", "warn-message1", "error-message1", "error-message2"));

        es.shutdown();
        es.awaitTermination(5L, TimeUnit.SECONDS);
    }

ここでは、元のチュートリアルと同じ感じで、ログレベルを模したものにしました。

Consumer側は、「info」のみにバインドさせたものと、「debug」、「info」、「warn」、「error」の4種類のroutingKeyにバインドさせたものの2つを用意します。

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(() -> subscribe("info"));
        Future<List<String>> f2 = es.submit(() -> subscribe("debug", "info", "warn", "error"));

Producer側は、「debug」、「info」、「warn」、「error」の4種類のroutingKeyに対して、それぞれメッセージを送ります。

        // Publish
        publish("debug", "debug-message1");
        publish("info", "info-message1", "info-message2");
        publish("warn", "warn-message1");
        publish("error", "error-message1", "error-message2");

すると、ひとつめのConsumerは「info」のみ、もうひとつのConsumerは「info」を含むすべてのメッセージを受信できました。

        assertThat(f1.get())
                .containsExactlyElementsOf(Arrays.asList("info-message1", "info-message2"));
        assertThat(f2.get())
                .containsExactlyElementsOf(Arrays.asList("debug-message1", "info-message1", "info-message2", "warn-message1", "error-message1", "error-message2"));

OKそうですね。

まとめ

今回は、複数のConsumerに対して、routingKeyでExchangeおよびキューの配信先をコントロールするといったチュートリアルを実践してみました。

RabbitMQのチュートリアルは動かしてみてもなかなかすぐにピンとこない感じがするのですが…これはけっこう動かしやすかったかなと思います。

もうちょっと、用語とかは理解していかないといけない気はしますけれどね。