RabbitMQを、チュートリアルに沿って勉強していこうという流れで。前回はインストールとHello Worldでしたが、今回はこちら。
RabbitMQ tutorial - Work Queues — RabbitMQ
チュートリアルの最初(RabbitMQ tutorial - "Hello World!" — RabbitMQ)では、キューに対してメッセージを投げる役(Producer)と受ける役(Consumer)がひとりずつでしたが、今回はメッセージを受け取る役が2つとなります。
これで、複数のWorker(というかConsumer)がキューに入れられたタスクを受け取って、それぞれ作業を行う…ということができるというわけですね。まあ、チュートリアル自体はとても簡単なものですが。
準備
RabbitMQのサーバーと、アクセス可能できるユーザーは作成済みとします。
今回はJavaクライアントからアクセスするので、Maven依存関係には以下のようにクライアントライブラリを追加します。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.4.1</version> <scope>test</scope> </dependency>
JUnitとAssertJはテスト用です。
テストコードの雛形作成
では、最初にテストコードの雛形を書いてみます。
src/test/java/org/littlewings/rabbitmq/workqueues/WorkQueuesTest.java
package org.littlewings.rabbitmq.workqueues; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.MessageProperties; import org.junit.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; public class WorkQueuesTest { // ここに、テストを書く! }
利用するimport文は、だいたいこのあたりですね、と。
そして、メッセージを2通投げて、2つのスレッドでそれぞれ動かすConsumerでメッセージを受け取るコードを書いてみます。
@Test public void workQueues() throws InterruptedException, ExecutionException, IOException, TimeoutException { String firstMessage = "Hello World!!"; String secondMessage = "Hello RabbitMQ!!"; Set<String> receivedMessages = new HashSet<>(Arrays.asList(firstMessage, secondMessage)); // setup workers ExecutorService es = Executors.newFixedThreadPool(2); Future<String> f1 = es.submit(this::receive); Future<String> f2 = es.submit(this::receive); // messages send send(firstMessage); send(secondMessage); receivedMessages.remove(f1.get()); receivedMessages.remove(f2.get()); assertThat(receivedMessages).isEmpty(); es.shutdown(); es.awaitTermination(10L, TimeUnit.SECONDS); }
受け取り側は、受信したメッセージを返すようにして、合わせて2通のメッセージが個々のConsumerで受け取れたかどうかを確認します。
1通目は「Hello World!!」で、2通目は「Hello RabbitMQ!!」です。
受信側(Consumer)
今回は、先に受信側からコードを載せていきます。
public String receive() throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); int prefetchCount = 1; channel.basicQos(prefetchCount); boolean durable = true; channel.queueDeclare("my-queue", durable, false, false, null); AtomicInteger counter = new AtomicInteger(); AtomicReference<String> receivedMessage = new AtomicReference<>(); boolean autoAck = false; channel.basicConsume("my-queue", autoAck, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { assertThat(envelope.getRoutingKey()).isEqualTo("my-queue"); receivedMessage.set(new String(body, StandardCharsets.UTF_8)); counter.incrementAndGet(); try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { } } finally { boolean multiple = false; channel.basicAck(envelope.getDeliveryTag(), multiple); } } }); TimeUnit.SECONDS.sleep(5L); assertThat(counter.get()).isEqualTo(1); channel.close(); connection.close(); return receivedMessage.get(); }
このメソッドで今回は2つの受信側、Consumerを使うわけですが、デフォルトではメッセージの受信はラウンド・ロビンで行われるようです。
前回のHello World時のコードと変更があるのは、まずはChannel#basicConsumeの第2引数(autoAck)をfalseにして
boolean autoAck = false; channel.basicConsume("my-queue", autoAck, new DefaultConsumer(channel) {
Consumer#handleDelivery中で、メッセージを処理し終わったらChannel#basicAckを呼び出していること。
boolean multiple = false; channel.basicAck(envelope.getDeliveryTag(), multiple);
これで、Consumerがメッセージを処理し終わったので、RabbitMQ側にメッセージを削除してよいことを表すAckを送信するようです。
ConsumerからAckが送られてこない場合は、再度別のConsumerにRabbitMQ側からメッセージを再送するのだとか。ただ、再送のタイミングはConsumerが落ちた時のようで、それには時間がかかるかも…と。
メッセージがいずれかのConsumerで処理され、失われないようにする仕組みってことですね。
前回はautoAckがtrueだったので、メッセージがConsumerに渡ったらAckをConsumerから明示的に送らずともRabbitMQから削除されてしまう、ということなのでしょう。
続いて、Channel#queueDeclareで第2引数(durable)をtrueに。
boolean durable = true; channel.queueDeclare("my-queue", durable, false, false, null);
こうすることで、RabbitMQサーバーが停止しても、メッセージが失われないようにすることができるようです。なお、このコードはConsumer側でもProducer側でも設定する必要があります。
そして、最後にChannel#basicQosの設定。
int prefetchCount = 1; channel.basicQos(prefetchCount);
デフォルトでは、RabbitMQはConsumerに均等にメッセージを送るようですが、Consumer側の負荷には偏りがある可能性があります。ここで、処理中のConsumerにメッセージを送信しないように、代わりに次のConsumerにメッセージを送信するようにするために入れる設定が、こちらのbasicQosのようです。これで、あるメッセージを複数のConsumerに同時に伝えることがなくなる、と。
送信側(Producer)
送信側(Producer)のコードは、こんな感じで作成。
public void send(String message) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable = true; channel.queueDeclare("my-queue", durable, false, false, null); channel.basicPublish("", "my-queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); channel.close(); connection.close(); }
基本的にはHello Worldの時と変わりませんが、Channel#queueDeclareの第2引数(durable)をtrueにし、Channel#basicPublishでメッセージを送る時に第3引数にMessageProperties.PERSISTENT_TEXT_PLAINを指定しています。
Channel#queueDeclareのdurableについてはConsumer側と同じですが、メッセージが失われないようにどのようにRabbitMQが保存(Persistence)するのかを指定するようです。
channel.basicPublish("", "my-queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
今回は、テキストですと。他には、Basicとかあるようです。
Producer側の変更点は、このくらいです。
まとめ
今回は、複数のConsumerを使ったプログラムを書いてみましたが、合わせてAckやメッセージの永続化(durable)についても知ることができました。
まだキューに慣れないので、感覚的に掴みづらい部分もけっこうあるのですが、気長にチュートリアルをこなして頑張ってみようと思います。