CLOVER🍀

That was when it all began.

RabbitMQで、キューにConsumerを複数つけて動かす

RabbitMQを、チュートリアルに沿って勉強していこうという流れで。前回はインストールとHello Worldでしたが、今回はこちら。

RabbitMQ tutorial - Work Queues — RabbitMQ

チュートリアルの最初(RabbitMQ tutorial - "Hello World!" — RabbitMQ)では、キューに対してメッセージを投げる役(Producer)と受ける役(Consumer)がひとりずつでしたが、今回はメッセージを受け取る役が2つとなります。

これで、複数のWorker(というかConsumer)がキューに入れられたタスクを受け取って、それぞれ作業を行う…ということができるというわけですね。まあ、チュートリアル自体はとても簡単なものですが。

準備

RabbitMQのサーバーと、アクセス可能できるユーザーは作成済みとします。

今回はJavaクライアントからアクセスするので、Maven依存関係には以下のようにクライアントライブラリを追加します。

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.4.1</version>
            <scope>test</scope>
        </dependency>

JUnitとAssertJはテスト用です。

テストコードの雛形作成

では、最初にテストコードの雛形を書いてみます。
src/test/java/org/littlewings/rabbitmq/workqueues/WorkQueuesTest.java

package org.littlewings.rabbitmq.workqueues;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;

public class WorkQueuesTest {
    // ここに、テストを書く!
}

利用するimport文は、だいたいこのあたりですね、と。

そして、メッセージを2通投げて、2つのスレッドでそれぞれ動かすConsumerでメッセージを受け取るコードを書いてみます。

    @Test
    public void workQueues() throws InterruptedException, ExecutionException, IOException, TimeoutException {
        String firstMessage = "Hello World!!";
        String secondMessage = "Hello RabbitMQ!!";

        Set<String> receivedMessages = new HashSet<>(Arrays.asList(firstMessage, secondMessage));

        // setup workers
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<String> f1 = es.submit(this::receive);
        Future<String> f2 = es.submit(this::receive);

        // messages send
        send(firstMessage);
        send(secondMessage);

        receivedMessages.remove(f1.get());
        receivedMessages.remove(f2.get());

        assertThat(receivedMessages).isEmpty();

        es.shutdown();
        es.awaitTermination(10L, TimeUnit.SECONDS);
    }

受け取り側は、受信したメッセージを返すようにして、合わせて2通のメッセージが個々のConsumerで受け取れたかどうかを確認します。

1通目は「Hello World!!」で、2通目は「Hello RabbitMQ!!」です。

受信側(Consumer)

今回は、先に受信側からコードを載せていきます。

    public String receive() throws IOException, InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

        boolean durable = true;
        channel.queueDeclare("my-queue", durable, false, false, null);

        AtomicInteger counter = new AtomicInteger();
        AtomicReference<String> receivedMessage = new AtomicReference<>();

        boolean autoAck = false;
        channel.basicConsume("my-queue", autoAck, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    assertThat(envelope.getRoutingKey()).isEqualTo("my-queue");

                    receivedMessage.set(new String(body, StandardCharsets.UTF_8));
                    counter.incrementAndGet();

                    try {
                        TimeUnit.SECONDS.sleep(3L);
                    } catch (InterruptedException e) {
                    }
                } finally {
                    boolean multiple = false;
                    channel.basicAck(envelope.getDeliveryTag(), multiple);
                }
            }
        });

        TimeUnit.SECONDS.sleep(5L);

        assertThat(counter.get()).isEqualTo(1);

        channel.close();
        connection.close();

        return receivedMessage.get();
    }

このメソッドで今回は2つの受信側、Consumerを使うわけですが、デフォルトではメッセージの受信はラウンド・ロビンで行われるようです。

前回のHello World時のコードと変更があるのは、まずはChannel#basicConsumeの第2引数(autoAck)をfalseにして

        boolean autoAck = false;
        channel.basicConsume("my-queue", autoAck, new DefaultConsumer(channel) {

Consumer#handleDelivery中で、メッセージを処理し終わったらChannel#basicAckを呼び出していること。

                    boolean multiple = false;
                    channel.basicAck(envelope.getDeliveryTag(), multiple);

これで、Consumerがメッセージを処理し終わったので、RabbitMQ側にメッセージを削除してよいことを表すAckを送信するようです。
ConsumerからAckが送られてこない場合は、再度別のConsumerにRabbitMQ側からメッセージを再送するのだとか。ただ、再送のタイミングはConsumerが落ちた時のようで、それには時間がかかるかも…と。

メッセージがいずれかのConsumerで処理され、失われないようにする仕組みってことですね。

前回はautoAckがtrueだったので、メッセージがConsumerに渡ったらAckをConsumerから明示的に送らずともRabbitMQから削除されてしまう、ということなのでしょう。

続いて、Channel#queueDeclareで第2引数(durable)をtrueに。

        boolean durable = true;
        channel.queueDeclare("my-queue", durable, false, false, null);

こうすることで、RabbitMQサーバーが停止しても、メッセージが失われないようにすることができるようです。なお、このコードはConsumer側でもProducer側でも設定する必要があります。

そして、最後にChannel#basicQosの設定。

        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

デフォルトでは、RabbitMQはConsumerに均等にメッセージを送るようですが、Consumer側の負荷には偏りがある可能性があります。ここで、処理中のConsumerにメッセージを送信しないように、代わりに次のConsumerにメッセージを送信するようにするために入れる設定が、こちらのbasicQosのようです。これで、あるメッセージを複数のConsumerに同時に伝えることがなくなる、と。

送信側(Producer)

送信側(Producer)のコードは、こんな感じで作成。

    public void send(String message) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare("my-queue", durable, false, false, null);

        channel.basicPublish("", "my-queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

        channel.close();
        connection.close();
    }

基本的にはHello Worldの時と変わりませんが、Channel#queueDeclareの第2引数(durable)をtrueにし、Channel#basicPublishでメッセージを送る時に第3引数にMessageProperties.PERSISTENT_TEXT_PLAINを指定しています。


Channel#queueDeclareのdurableについてはConsumer側と同じですが、メッセージが失われないようにどのようにRabbitMQが保存(Persistence)するのかを指定するようです。

        channel.basicPublish("", "my-queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

今回は、テキストですと。他には、Basicとかあるようです。

Producer側の変更点は、このくらいです。

まとめ

今回は、複数のConsumerを使ったプログラムを書いてみましたが、合わせてAckやメッセージの永続化(durable)についても知ることができました。

まだキューに慣れないので、感覚的に掴みづらい部分もけっこうあるのですが、気長にチュートリアルをこなして頑張ってみようと思います。