RabbitMQのチュートリアル3段、Publish/Subscribeをやってみます。
RabbitMQ - RabbitMQ tutorial - Publish/Subscribe
これまでのチュートリアルは、キューを作成して、ひとつのProducerがひとつのConsumerにメッセージを配信する構成でした。今度は、メッセージを複数のConsumerに送ります。このパターンは、「Publish/Subscribe」として知られています。
こちらのチュートリアルを見つつ、ひとつのProducerから投げたメッセージを、複数のConsumerが受け取るコードを書いて動かしてみます。
ここでは、こういう動作を行うコードを書きます。
- Producerがメッセージを送る
- キューがバッファとしてメッセージを保存する
- Consumerがメッセージを受け取る
RabbitMQの考えとして、Producerはキューに直接メッセージを送らないというものがあるようです。実際、Producerはキューに入ったメッセージが配信されたかどうかは知りません。
代わりに、ProducerはExchangeにメッセージを送信するだけです。Exchangeは、受け取ったメッセージをどのように扱うべきかを知っています。特定のキューに追加されるべきか?多くのキューに追加されるべきか?または破棄されるべきか?これは、Exchangeの種類によって決まります。
今回のチュートリアルでは、このあたりを見ていくことになります。
準備
まずは、Maven依存関係。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.4.1</version> <scope>test</scope> </dependency>
RabbitMQのJava Clientと、テストコード用のJUnit/AssertJを加えています。
また、RabbitMQはデフォルト状態+アクセス可能なユーザーを作成した状態で起動しているものとします。
テストコードの雛形
テストコードの全体像は、こんな感じにしました。
src/test/java/org/littlewings/rabbitmq/publishsubscribe/PublishSubscribeTest.java
package org.littlewings.rabbitmq.publishsubscribe; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class PublishSubscribeTest { @Test public void publishSubscribe() throws IOException, TimeoutException, ExecutionException, InterruptedException { // Source messages List<String> messages = Arrays.asList("Hello World!", "Hello RabbitMQ!!"); // Subscribe ExecutorService es = Executors.newFixedThreadPool(2); Future<List<String>> f1 = es.submit(this::subscribe); Future<List<String>> f2 = es.submit(this::subscribe); // Subscribe側が起動しきるまで、待機 TimeUnit.SECONDS.sleep(2L); // Publish publish(messages.toArray(new String[messages.size()])); assertThat(f1.get()) .containsExactlyElementsOf(messages); assertThat(f2.get()) .containsExactlyElementsOf(messages); es.shutdown(); es.awaitTermination(5L, TimeUnit.SECONDS); } // ここに、Publish/Subscribeのコードを書く! }
先にConsumerを2つ起動し、それからProducerにメッセージを投げてもらいます。投げるメッセージは2つで、それぞれ「Hello World!」と「Hello RabbitMQ!!」です。2つのComsumerからは、この両方のメッセージが受け取れているかどうか確認します。Comsumer側は、スレッドを2つ使って2つ分のComsumerがいることを表現します。
なお、ちょっと待機時間が入っているのは、Consumerが起動しきる前にProducerがメッセージを投げてしまうと、Consumerがいない状態のメッセージは破棄されてしまうからです…。
送信側(Producer)
Producer側のコードは、こんな感じ。
private void publish(String... messages) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); Arrays.asList(messages).forEach(message -> { try { channel.basicPublish("logs", "", null, message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); } }); channel.close(); connection.close(); }
基本的にはキューを直接使っていた時と変わりませんが、変化があったのはここ。
channel.exchangeDeclare("logs", "fanout");
Channel#exchangeDeclareを使用します。ここでは「logs」というのはExchangeの名前を指しますが、「fanout」というのがExchangeの種類を指します。
Exchangeの種類は「direct」、「topic」、「headers」、「fanout」のいずれかを使用できます。今回使用している「fanout」はとても単純なもので、すべてのキューにメッセージをブロードキャストするものだとか。
ブロードキャスト…?
RabbitMQ - AMQP 0-9-1 Model Explained
名前なしのExchange
利用可能なExchangeは、rabbitmqctlコマンドで確認することができます。
# sudo -u rabbitmq rabbitmqctl list_exchanges
Listing exchanges ...
amq.headers headers
amq.topic topic
direct
amq.rabbitmq.log topic
amq.fanout fanout
amq.rabbitmq.trace topic
amq.direct direct
amq.match headers
この「amq.*」となっているExchangeは、デフォルトの(名前なしの)Exchangeで作成すればすぐに使えるようになっています。
これまでのチュートリアルではExchangeについては意識しておらず、Channel#basicPublishを呼び出す時に、第1引数を空文字にしていました。
channel.basicPublish("", "hello", null, message.getBytes());
第1引数はExchangeの名前を表すらしく、ここで空文字を指定したということは名前なしのExchangeを使用していることになるようです。
そして、今回の例ではExchangeの名前を指定しています、と。
channel.exchangeDeclare("logs", "fanout");
受信側(Comsumer)
Comsumer側のコードは、こんな感じになりました。
private List<String> subscribe() throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", ""); List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>()); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) { receivedMessages.add(new String(bytes, StandardCharsets.UTF_8)); } }; channel.basicConsume(queueName, true, consumer); TimeUnit.SECONDS.sleep(10L); return receivedMessages; }
Consumer側の特徴は、Channel#queueDeclare、queueBindの使用でしょうか。
単純なメッセージ送信、受信のコードでは、キューの名前を指定してConsumerの実装を紐づけていましたが、今回はProducer側でキューの作成をしていません。とはいえ、Channel#basicConsumerでの登録時にはキューの名前が必要になります。
そこで、引数なしのChannel#queueDeclareを呼び出すことで、ランダムな名前の空で、新しいキューを作成します。このキューは、クライアントが切断すると削除されます。
non-durable、exclusive、autodeleteなキューとなるそうな。
channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue();
ランダムなキューの名前のサンプルは、「amq.gen-JzTY20BRgKO-HjmUJj0wLg」といった感じです。
そして、キューへのバインドを行います。この時に、先ほど取得したキューの名前、そしてExchangeの名前を使用します。
channel.queueBind(queueName, "logs", "");
なお、現在のバインドされている状態は、「rabbitmqctl list_bindings」で確認することができます。
# sudo -u rabbitmq rabbitmqctl list_bindings Listing bindings ... exchange amq.gen-5rTz4mjD2G09MUACN_QIhw queue amq.gen-5rTz4mjD2G09MUACN_QIhw [] exchange amq.gen-x69BVWFhg5UPtTYJgtlg5A queue amq.gen-x69BVWFhg5UPtTYJgtlg5A [] logs exchange amq.gen-5rTz4mjD2G09MUACN_QIhw queue [] logs exchange amq.gen-x69BVWFhg5UPtTYJgtlg5A queue []
あとは、Consumerの実装を作成してChannel#basicConsumeで登録すると、Producerのメッセージ送信に反応して受信が行われます。
channel.basicConsume(queueName, true, consumer);
まとめ
今回は、RabbitMQを使用してPublish/Subscribeについて書いてみました。
Exchangeが出てきたりと、新しいことも学びましたが、Exchangeの種類やキューへのバインドについてはちょっと理解が怪しいです。もう少し進めながら、ちょっとずつわかるといいなーという感じですね。