CLOVER🍀

That was when it all began.

RabbitMQでPublish/Subscribeして遊ぶ

RabbitMQのチュートリアル3段、Publish/Subscribeをやってみます。

RabbitMQ - RabbitMQ tutorial - Publish/Subscribe

これまでのチュートリアルは、キューを作成して、ひとつのProducerがひとつのConsumerにメッセージを配信する構成でした。今度は、メッセージを複数のConsumerに送ります。このパターンは、「Publish/Subscribe」として知られています。

こちらのチュートリアルを見つつ、ひとつのProducerから投げたメッセージを、複数のConsumerが受け取るコードを書いて動かしてみます。

ここでは、こういう動作を行うコードを書きます。

  • Producerがメッセージを送る
  • キューがバッファとしてメッセージを保存する
  • Consumerがメッセージを受け取る

RabbitMQの考えとして、Producerはキューに直接メッセージを送らないというものがあるようです。実際、Producerはキューに入ったメッセージが配信されたかどうかは知りません。

代わりに、ProducerはExchangeにメッセージを送信するだけです。Exchangeは、受け取ったメッセージをどのように扱うべきかを知っています。特定のキューに追加されるべきか?多くのキューに追加されるべきか?または破棄されるべきか?これは、Exchangeの種類によって決まります。

今回のチュートリアルでは、このあたりを見ていくことになります。

準備

まずは、Maven依存関係。

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.4.1</version>
            <scope>test</scope>
        </dependency>

RabbitMQのJava Clientと、テストコード用のJUnit/AssertJを加えています。

また、RabbitMQはデフォルト状態+アクセス可能なユーザーを作成した状態で起動しているものとします。

テストコードの雛形

テストコードの全体像は、こんな感じにしました。
src/test/java/org/littlewings/rabbitmq/publishsubscribe/PublishSubscribeTest.java

package org.littlewings.rabbitmq.publishsubscribe;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class PublishSubscribeTest {
    @Test
    public void publishSubscribe() throws IOException, TimeoutException, ExecutionException, InterruptedException {
        // Source messages
        List<String> messages = Arrays.asList("Hello World!", "Hello RabbitMQ!!");

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(this::subscribe);
        Future<List<String>> f2 = es.submit(this::subscribe);

        // Subscribe側が起動しきるまで、待機
        TimeUnit.SECONDS.sleep(2L);

        // Publish
        publish(messages.toArray(new String[messages.size()]));

        assertThat(f1.get())
                .containsExactlyElementsOf(messages);
        assertThat(f2.get())
                .containsExactlyElementsOf(messages);

        es.shutdown();
        es.awaitTermination(5L, TimeUnit.SECONDS);
    }

    // ここに、Publish/Subscribeのコードを書く!
}

先にConsumerを2つ起動し、それからProducerにメッセージを投げてもらいます。投げるメッセージは2つで、それぞれ「Hello World!」と「Hello RabbitMQ!!」です。2つのComsumerからは、この両方のメッセージが受け取れているかどうか確認します。Comsumer側は、スレッドを2つ使って2つ分のComsumerがいることを表現します。

なお、ちょっと待機時間が入っているのは、Consumerが起動しきる前にProducerがメッセージを投げてしまうと、Consumerがいない状態のメッセージは破棄されてしまうからです…。

送信側(Producer)

Producer側のコードは、こんな感じ。

    private void publish(String... messages) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("logs", "fanout");

        Arrays.asList(messages).forEach(message -> {
            try {
                channel.basicPublish("logs", "", null, message.getBytes(StandardCharsets.UTF_8));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        channel.close();
        connection.close();
    }

基本的にはキューを直接使っていた時と変わりませんが、変化があったのはここ。

        channel.exchangeDeclare("logs", "fanout");

Channel#exchangeDeclareを使用します。ここでは「logs」というのはExchangeの名前を指しますが、「fanout」というのがExchangeの種類を指します。

Exchangeの種類は「direct」、「topic」、「headers」、「fanout」のいずれかを使用できます。今回使用している「fanout」はとても単純なもので、すべてのキューにメッセージをブロードキャストするものだとか。

ブロードキャスト…?

RabbitMQ - AMQP 0-9-1 Model Explained

名前なしのExchange

利用可能なExchangeは、rabbitmqctlコマンドで確認することができます。

# sudo -u rabbitmq rabbitmqctl list_exchanges
Listing exchanges ...
amq.headers	headers
amq.topic	topic
	direct
amq.rabbitmq.log	topic
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.direct	direct
amq.match	headers

この「amq.*」となっているExchangeは、デフォルトの(名前なしの)Exchangeで作成すればすぐに使えるようになっています。

これまでのチュートリアルではExchangeについては意識しておらず、Channel#basicPublishを呼び出す時に、第1引数を空文字にしていました。

channel.basicPublish("", "hello", null, message.getBytes());

第1引数はExchangeの名前を表すらしく、ここで空文字を指定したということは名前なしのExchangeを使用していることになるようです。


そして、今回の例ではExchangeの名前を指定しています、と。

        channel.exchangeDeclare("logs", "fanout");

受信側(Comsumer)

Comsumer側のコードは、こんな感じになりました。

    private List<String> subscribe() throws IOException, InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("logs", "fanout");
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, "logs", "");

        List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) {
                receivedMessages.add(new String(bytes, StandardCharsets.UTF_8));
            }
        };

        channel.basicConsume(queueName, true, consumer);

        TimeUnit.SECONDS.sleep(10L);

        return receivedMessages;
    }

Consumer側の特徴は、Channel#queueDeclare、queueBindの使用でしょうか。

単純なメッセージ送信、受信のコードでは、キューの名前を指定してConsumerの実装を紐づけていましたが、今回はProducer側でキューの作成をしていません。とはいえ、Channel#basicConsumerでの登録時にはキューの名前が必要になります。

そこで、引数なしのChannel#queueDeclareを呼び出すことで、ランダムな名前の空で、新しいキューを作成します。このキューは、クライアントが切断すると削除されます。
non-durable、exclusive、autodeleteなキューとなるそうな。

        channel.exchangeDeclare("logs", "fanout");
        String queueName = channel.queueDeclare().getQueue();

ランダムなキューの名前のサンプルは、「amq.gen-JzTY20BRgKO-HjmUJj0wLg」といった感じです。

そして、キューへのバインドを行います。この時に、先ほど取得したキューの名前、そしてExchangeの名前を使用します。

        channel.queueBind(queueName, "logs", "");

なお、現在のバインドされている状態は、「rabbitmqctl list_bindings」で確認することができます。

# sudo -u rabbitmq rabbitmqctl list_bindings
Listing bindings ...
	exchange	amq.gen-5rTz4mjD2G09MUACN_QIhw	queue	amq.gen-5rTz4mjD2G09MUACN_QIhw	[]
	exchange	amq.gen-x69BVWFhg5UPtTYJgtlg5A	queue	amq.gen-x69BVWFhg5UPtTYJgtlg5A	[]
logs	exchange	amq.gen-5rTz4mjD2G09MUACN_QIhw	queue		[]
logs	exchange	amq.gen-x69BVWFhg5UPtTYJgtlg5A	queue		[]

あとは、Consumerの実装を作成してChannel#basicConsumeで登録すると、Producerのメッセージ送信に反応して受信が行われます。

        channel.basicConsume(queueName, true, consumer);

まとめ

今回は、RabbitMQを使用してPublish/Subscribeについて書いてみました。

Exchangeが出てきたりと、新しいことも学びましたが、Exchangeの種類やキューへのバインドについてはちょっと理解が怪しいです。もう少し進めながら、ちょっとずつわかるといいなーという感じですね。