RabbitMQのチュートリアルで遊んでみようなシリーズ、今度はTopicsを扱います。
このチュートリアルの前には、Routingというものを扱っているのですが、そこでは複数の条件で振り分けることができないという制限があったのでこれを改善するというものらしいです。
リンク先のチュートリアルではTopicと呼ばれるものを使用して、Unix/Linuxにおけるsyslogライクなサンプルを試しています。
Topicとは
Topic Exchangeに送信されたメッセージは、ドット(.)で区切られた、ワードのリストであるrouting keyを構成することができます。ワード自体はなんでも構いませんが、通常はメッセージに対して意味、特徴のあるものを使用するでしょう。例えば、"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"など。
routing keyとして多くのワードを含むことができますが、その長さには255バイトまでの制限があります。
binding keyについても、同じ形式である必要があります。Topic Exchangeは、directと似通ったロジックを背後に備えています。特定のrouting keyと共に送信されたメッセージは、すべてのQueueに対してbinding keyにマッチするか確認が行われます。
しかし、binding keyには次の2つの重要な、特別な形式があります。
- *(star) … ひとつのワードを代替可能
- #(hash) … ゼロ、または複数のワードを代替可能
RabbitMQのチュートリアルでの説明は、これを
"
を例に解説しています。
このあたりは、このエントリではコードでパターンを示しながら書いていくとしましょう。
それでは、実際に使っていってみます。
準備
まず、RabbitMQ自体はすでにインストール、起動済みとします。また、認証が行われる前提のコードになっています。
実装を行うためのMaven依存関係は、以下の通り。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.5.1</version> <scope>test</scope> </dependency>
JUnitとAssertJは、テストコード用です。
テストコードの雛形
まず、テストコードの雛形部分を書いておきます。
src/test/java/org/littlewings/rabbitmq/topics/TopicsTest.java
package org.littlewings.rabbitmq.topics; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class TopicsTest { // ここに、テストを書く! }
Producer側を書く
最初に、メッセージを送信するProducer側を書いていきましょう。
作成したコードは、こちら。
public void publish(String routingKey, String... messages) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("messageExchange", "topic"); Arrays.asList(messages).forEach(message -> { try { channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); } }); channel.close(); connection.close(); }
routing keyは、メソッド呼び出し時に指定できるようにしました。
Channel#exchangeDeclareを呼び出す際に、第2引数(type)を「topic」にしているところがポイントです。
channel.exchangeDeclare("messageExchange", "topic");
あとは、routing keyに沿ってChannel#publishします。
channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8));
Consumer側
続いて、Consumer側を書いてみます。
作成したコードは、こちら。
public List<String> subscribe(String... routingKeys) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("messageExchange", "topic"); String queueName = channel.queueDeclare().getQueue(); Arrays.asList(routingKeys).forEach(routingKey -> { try { channel.queueBind(queueName, "messageExchange", routingKey); } catch (IOException e) { throw new UncheckedIOException(e); } }); List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>()); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) { receivedMessages.add(new String(bytes, StandardCharsets.UTF_8)); } }; channel.basicConsume(queueName, true, consumer); TimeUnit.SECONDS.sleep(5L); return receivedMessages; }
こちらも、routing key(というかbinding key)は複数受け取れるようにメソッドを宣言しています。
Channel#exchangeDeclareで「topic」を指定しているところは、Producer側と同じ。
channel.exchangeDeclare("messageExchange", "topic");
あとは、Queueにバインドして送信されてくるメッセージを待ちます。
確認
それでは、これらのコードでTopicを使って確認してみましょう。
特に、「*」と「#」が特殊な意味を持つということでしたね。このあたりを踏まえて。
では、まず「*」入りのものから。
@Test public void topicsAsterisk() throws IOException, TimeoutException, ExecutionException, InterruptedException { // Source messages List<String> infoLevelMessages = Arrays.asList("Connected target host.", "Disconnected target host."); List<String> errorLevelMessages = Arrays.asList("Connection timeout.", "Read timeout."); List<String> allLevelMessages = new ArrayList<>(infoLevelMessages); allLevelMessages.addAll(errorLevelMessages); // Subscribe ExecutorService es = Executors.newFixedThreadPool(8); Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages")); Future<List<String>> f2 = es.submit(() -> subscribe("network.error.*")); Future<List<String>> f3 = es.submit(() -> subscribe("network.*.*")); Future<List<String>> f4 = es.submit(() -> subscribe("network.*")); Future<List<String>> f5 = es.submit(() -> subscribe("*.info.*")); Future<List<String>> f6 = es.submit(() -> subscribe("*.*.*")); Future<List<String>> f7 = es.submit(() -> subscribe("*.*")); Future<List<String>> f8 = es.submit(() -> subscribe("*")); // Subscribe側が起動しきるまで、待機 TimeUnit.SECONDS.sleep(2L); // Publish publish("network.info.messages", infoLevelMessages.toArray(new String[infoLevelMessages.size()])); publish("network.error.messages", errorLevelMessages.toArray(new String[errorLevelMessages.size()])); assertThat(f1.get()) .containsExactlyElementsOf(errorLevelMessages); assertThat(f2.get()) .containsExactlyElementsOf(errorLevelMessages); assertThat(f3.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f4.get()) .isEmpty(); assertThat(f5.get()) .containsExactlyElementsOf(infoLevelMessages); assertThat(f6.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f7.get()) .isEmpty(); assertThat(f8.get()) .isEmpty(); es.shutdown(); es.awaitTermination(5L, TimeUnit.SECONDS); }
Consumer側を複数定義して、syslogっぽいワードで紐づけてみます。まったく「*」を含まない具体的なものから、一部のワードを「*」としたもの、3つのワードで今回は区切られていますが、数が足りないものなどなど。
Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages")); Future<List<String>> f2 = es.submit(() -> subscribe("network.error.*")); Future<List<String>> f3 = es.submit(() -> subscribe("network.*.*")); Future<List<String>> f4 = es.submit(() -> subscribe("network.*")); Future<List<String>> f5 = es.submit(() -> subscribe("*.info.*")); Future<List<String>> f6 = es.submit(() -> subscribe("*.*.*")); Future<List<String>> f7 = es.submit(() -> subscribe("*.*")); Future<List<String>> f8 = es.submit(() -> subscribe("*"));
で、メッセージ送信。キーはそれぞれ、「network.info.messages」と「network.error.messages」としました。INFOレベルとERRORレベル的な。
publish("network.info.messages", infoLevelMessages.toArray(new String[infoLevelMessages.size()])); publish("network.error.messages", errorLevelMessages.toArray(new String[errorLevelMessages.size()]));
先ほどSubscribeの設定をした、どのConsumerがメッセージを受け取れているか、というテストになります。
結果は、こんな感じに。
assertThat(f1.get()) .containsExactlyElementsOf(errorLevelMessages); assertThat(f2.get()) .containsExactlyElementsOf(errorLevelMessages); assertThat(f3.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f4.get()) .isEmpty(); assertThat(f5.get()) .containsExactlyElementsOf(infoLevelMessages); assertThat(f6.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f7.get()) .isEmpty(); assertThat(f8.get()) .isEmpty();
変数名の例が、ちょっと良くなかったかもですが…。
こういうのは「*」がワードを補完するのですべてのメッセージを受け取れていますが、
Future<List<String>> f2 = es.submit(() -> subscribe("network.error.*"));
こういう「*」が含まれていても「.」区切りの数が足りないものは、メッセージを受信できていません。
Future<List<String>> f4 = es.submit(() -> subscribe("network.*")); Future<List<String>> f7 = es.submit(() -> subscribe("*.*")); Future<List<String>> f8 = es.submit(() -> subscribe("*"));
なるほど、確かに
- *(star) … ひとつのワードを代替可能
ひとつのワードを代替可能、ですね。
また、こういうのはすべてのメッセージを受信できていますし、
Future<List<String>> f3 = es.submit(() -> subscribe("network.*.*")); Future<List<String>> f6 = es.submit(() -> subscribe("*.*.*"));
他のパターンだと、より具体的な部分でマッチしたメッセージだけが受信できていることが確認できます。
Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages")); Future<List<String>> f2 = es.submit(() -> subscribe("network.error.*")); Future<List<String>> f5 = es.submit(() -> subscribe("*.info.*"));
上2つはERRORレベルのみ、下はINFOレベルのみですね。
では、続いて今度は「#」を使ってみます。
@Test public void topicsHash() throws IOException, TimeoutException, ExecutionException, InterruptedException { // Source messages List<String> infoLevelMessages = Arrays.asList("Connected target host.", "Disconnected target host."); List<String> errorLevelMessages = Arrays.asList("Connection timeout.", "Read timeout."); List<String> allLevelMessages = new ArrayList<>(infoLevelMessages); allLevelMessages.addAll(errorLevelMessages); // Subscribe ExecutorService es = Executors.newFixedThreadPool(8); Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages")); Future<List<String>> f2 = es.submit(() -> subscribe("network.error.#")); Future<List<String>> f3 = es.submit(() -> subscribe("network.#.#")); Future<List<String>> f4 = es.submit(() -> subscribe("network.#")); Future<List<String>> f5 = es.submit(() -> subscribe("#.info.#")); Future<List<String>> f6 = es.submit(() -> subscribe("#.#.#")); Future<List<String>> f7 = es.submit(() -> subscribe("#.#")); Future<List<String>> f8 = es.submit(() -> subscribe("#")); // Subscribe側が起動しきるまで、待機 TimeUnit.SECONDS.sleep(2L); // Publish publish("network.info.messages", infoLevelMessages.toArray(new String[infoLevelMessages.size()])); publish("network.error.messages", errorLevelMessages.toArray(new String[errorLevelMessages.size()])); assertThat(f1.get()) .containsExactlyElementsOf(errorLevelMessages); assertThat(f2.get()) .containsExactlyElementsOf(errorLevelMessages); assertThat(f3.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f4.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f5.get()) .containsExactlyElementsOf(infoLevelMessages); assertThat(f6.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f7.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f8.get()) .containsExactlyElementsOf(allLevelMessages); es.shutdown(); es.awaitTermination(5L, TimeUnit.SECONDS); }
Subscribeするパターンは先ほどとほぼ同じですが、「*」だった部分を「#」に変えています。
Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages")); Future<List<String>> f2 = es.submit(() -> subscribe("network.error.#")); Future<List<String>> f3 = es.submit(() -> subscribe("network.#.#")); Future<List<String>> f4 = es.submit(() -> subscribe("network.#")); Future<List<String>> f5 = es.submit(() -> subscribe("#.info.#")); Future<List<String>> f6 = es.submit(() -> subscribe("#.#.#")); Future<List<String>> f7 = es.submit(() -> subscribe("#.#")); Future<List<String>> f8 = es.submit(() -> subscribe("#"));
すると、結果が先ほどとは変わってメッセージを受信しないConsumerがいなくなりました。
assertThat(f1.get()) .containsExactlyElementsOf(errorLevelMessages); assertThat(f2.get()) .containsExactlyElementsOf(errorLevelMessages); assertThat(f3.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f4.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f5.get()) .containsExactlyElementsOf(infoLevelMessages); assertThat(f6.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f7.get()) .containsExactlyElementsOf(allLevelMessages); assertThat(f8.get()) .containsExactlyElementsOf(allLevelMessages);
「*」を使っていた時は、「.」の部分を含めたワードの数が合わないものはメッセージを受信できていませんでしたが、「#」の場合は複数のワードとしても機能するからですね。
- #(hash) … ゼロ、または複数のワードを代替可能
なるほど。
まとめ
RabbitMQのTopic Exchangeを使って、Routingのチュートリアルよりも複雑なメッセージの送受信の振り分けの例を書いてみました。
実際に使って書いてみると、動きがわかってなかなか面白いですね。