RabbitMQのチュートリアルで遊んでみるシリーズ、今度はRoutingです。
RabbitMQ - RabbitMQ tutorial - Routing
このひとつ前のチュートリアルでは、Producer側が送ったメッセージがConsumerにブロードキャストするといったものでした。
今回のチュートリアルでは、Consumer側がメッセージのサブセットを受け取れるようにしてみます。このエントリでは、単純なメッセージの振り分けに加えて、元のチュートリアルのログ出力サンプルをマネたものを作ってみたいと思います。
メッセージをブロードキャストしていた時の振り返り
Producer側から送ったメッセージをブロードキャストしていた時のConsumer側のコードは、こんな感じでした。
String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", "");
Channel#queueBindの第3引数が空文字です。
この第3引数は、routingKeyという特別なもので、このチュートリアルではこれをバインディングキーと呼ぶそうです(Channel#queueBindで、Exchangeとキューをバインドして関連付けますが、その時のキーとなっているので)。
バインディングキーの意味は、Exchangeの種類に依存します。このひとつ前のチュートリアルでは、Exchangeの種類は「fanout」となっていましたが、この場合はバインディングキーは無視されるそうです。
Exchangeの種類については、ここを見ればよいみたいですね。
RabbitMQ - AMQP 0-9-1 Model Explained
それでは、進めてみましょう。
準備
まず、RabbitMQは起動済みとします。
そして、クライアントライブラリにJavaを利用するので、Maven依存関係を定義します。こちらです。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.4.1</version> <scope>test</scope> </dependency>
テストコード用に、JUnitとAssertJ付きです。
テストコードの雛形
テストコードで動作確認をしますが、import文などの大枠はこんな感じです。
src/test/java/org/littlewings/rabbitmq/routing/RoutingTest.java
package org.littlewings.rabbitmq.routing; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class RoutingTest { // ここに、テストを書く! }
Producer側を書く
ここでは、Producerから送信するメッセージを、特定のExchange、キューにバインドしたConsumerに送るコードを書きます。
前回のチュートリアルでは「fanout」でブロードキャストしたわけですが、今回は「direct」を使用します。
「direct」を使用すると、routingKeyにマッチしたキューにメッセージが送信されることになります。
書いたコードは、こちらです。
public void publish(String routingKey, String... messages) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("messageExchange", "direct"); Arrays.asList(messages).forEach(message -> { try { channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); } }); channel.close(); connection.close(); }
routingKeyは、メソッドの引数としてもらうようにしました。
Exchangeを「direct」で定義して
channel.exchangeDeclare("messageExchange", "direct");
メッセージをroutingKeyを指定して送信します。Channel#basicPublishの第2引数に、routingKeyを指定しているところがポイントです。
Arrays.asList(messages).forEach(message -> { try { channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); } });
Consumer側を書く
続いて、Consumer側を書いてみます。
書いたコードは、こちら。
public List<String> subscribe(String... routingKeys) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("messageExchange", "direct"); String queueName = channel.queueDeclare().getQueue(); Arrays.asList(routingKeys).forEach(routingKey -> { try { channel.queueBind(queueName, "messageExchange", routingKey); } catch (IOException e) { throw new UncheckedIOException(e); } }); List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>()); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) { receivedMessages.add(new String(bytes, StandardCharsets.UTF_8)); } }; channel.basicConsume(queueName, true, consumer); TimeUnit.SECONDS.sleep(5L); return receivedMessages; }
テストできるように、受信したメッセージはListで戻すようにしています。
ここでのポイントは、ExchangeをProducer側と同じく「direct」で宣言していることと
channel.exchangeDeclare("messageExchange", "direct");
Channel#queueBindで、routingKeyを第3引数に指定していることですね。なお、あとでも書きますが、Channel#queueBindでは複数のroutingKeyをバインドさせることができます。
String queueName = channel.queueDeclare().getQueue(); Arrays.asList(routingKeys).forEach(routingKey -> { try { channel.queueBind(queueName, "messageExchange", routingKey); } catch (IOException e) { throw new UncheckedIOException(e); } });
ひとつのroutingKeyにバインドさせてみる
それでは、動かしてみましょう。まずは、ひとつのrougingKeyを使ってバインドさせてみます。
書いたコードは、こちら。
@Test public void routingOneToOneBinding() throws IOException, TimeoutException, ExecutionException, InterruptedException { // Source messages List<String> messages = Arrays.asList("Hello World!", "Hello RabbitMQ!!"); // Subscribe ExecutorService es = Executors.newFixedThreadPool(2); Future<List<String>> f1 = es.submit(() -> subscribe("black")); Future<List<String>> f2 = es.submit(() -> subscribe("white")); // Subscribe側が起動しきるまで、待機 TimeUnit.SECONDS.sleep(2L); // Publish publish("black", messages.toArray(new String[messages.size()])); assertThat(f1.get()) .containsExactlyElementsOf(messages); assertThat(f2.get()) .isEmpty(); es.shutdown(); es.awaitTermination(5L, TimeUnit.SECONDS); }
送信するメッセージは2つとし、それぞれ「black」と「white」というroutingKeyでExchange、キューにバインドします。
// Source messages List<String> messages = Arrays.asList("Hello World!", "Hello RabbitMQ!!"); // Subscribe ExecutorService es = Executors.newFixedThreadPool(2); Future<List<String>> f1 = es.submit(() -> subscribe("black")); Future<List<String>> f2 = es.submit(() -> subscribe("white"));
そして、routingKey「black」で送信。
// Publish publish("black", messages.toArray(new String[messages.size()]));
すると、「black」でバインドしたConsumer側はメッセージを受信しますが、「white」でバインドしたConsumer側はメッセージを受信しないという結果になります。
assertThat(f1.get()) .containsExactlyElementsOf(messages); assertThat(f2.get()) .isEmpty();
ちゃんと振り分けてくれましたね。
複数のroutingKeyにバインドさせる
最後に、Consumerを複数のroutingKeyにバインドさせてみます。
書いたコードはこちら。
@Test public void routingMultipleBinding() throws IOException, TimeoutException, ExecutionException, InterruptedException { // Subscribe ExecutorService es = Executors.newFixedThreadPool(2); Future<List<String>> f1 = es.submit(() -> subscribe("info")); Future<List<String>> f2 = es.submit(() -> subscribe("debug", "info", "warn", "error")); // Subscribe側が起動しきるまで、待機 TimeUnit.SECONDS.sleep(2L); // Publish publish("debug", "debug-message1"); publish("info", "info-message1", "info-message2"); publish("warn", "warn-message1"); publish("error", "error-message1", "error-message2"); assertThat(f1.get()) .containsExactlyElementsOf(Arrays.asList("info-message1", "info-message2")); assertThat(f2.get()) .containsExactlyElementsOf(Arrays.asList("debug-message1", "info-message1", "info-message2", "warn-message1", "error-message1", "error-message2")); es.shutdown(); es.awaitTermination(5L, TimeUnit.SECONDS); }
ここでは、元のチュートリアルと同じ感じで、ログレベルを模したものにしました。
Consumer側は、「info」のみにバインドさせたものと、「debug」、「info」、「warn」、「error」の4種類のroutingKeyにバインドさせたものの2つを用意します。
// Subscribe ExecutorService es = Executors.newFixedThreadPool(2); Future<List<String>> f1 = es.submit(() -> subscribe("info")); Future<List<String>> f2 = es.submit(() -> subscribe("debug", "info", "warn", "error"));
Producer側は、「debug」、「info」、「warn」、「error」の4種類のroutingKeyに対して、それぞれメッセージを送ります。
// Publish publish("debug", "debug-message1"); publish("info", "info-message1", "info-message2"); publish("warn", "warn-message1"); publish("error", "error-message1", "error-message2");
すると、ひとつめのConsumerは「info」のみ、もうひとつのConsumerは「info」を含むすべてのメッセージを受信できました。
assertThat(f1.get()) .containsExactlyElementsOf(Arrays.asList("info-message1", "info-message2")); assertThat(f2.get()) .containsExactlyElementsOf(Arrays.asList("debug-message1", "info-message1", "info-message2", "warn-message1", "error-message1", "error-message2"));
OKそうですね。