RabbitMQのチュートリアルの最後にある、RPCを試してみます。
タイトルだけ見ると、「キューなのにRPC??」という感じですが、よくよく見るとこんな感じっぽいです。
- BlockingQueueを内部に持ったConsumerの拡張クラス、QueueingConsumerを受信したメッセージをBlockingQueueに格納
- Client/ServerともにQueueingConsumer#nextDeliveryを呼び出してメッセージを受信するまでブロックし、受信したら処理開始
- Server側は、メッセージの送信側にメッセージを送り返す
このチュートリアルのサンプルでは、Client/Serverの両方ともChannel#basicConsumeを使用してキューにConsumerを登録します。
あとはコードを見た方が早いような気がしますが…、一応、チュートリアルの注意としてはRCPを使うことについて、こんな注意事項が書いてあります。
- 呼び出している処理が実はローカルの関数呼び出しではなく、RPCの場合、しかもプログラマがそれに気付いていない場合は問題が発生するかもしれない
- システムを複雑化し、デバッグをしづらくする
- RPCの乱用は、ソフトウェアのシンプルさを失わせメンテナンスの難しいスパゲッティコードを生み出すかもしれない
とまあ、けっこう否定的ですね。使うなら、リモート呼び出しであることをちゃんとドキュメント化しなさいとか、エラーケース(Server側がダウンしている時など)について考慮しなさいとかが書かれています。
前置きはこれくらいにして、書いていってみましょう。
準備
RabbitMQ自体は、起動済みとします。また、アクセスの際には認証を行います(ID/パスワードは、コードに書いています)。
Maven依存関係は、こちら。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.5.2</version> <scope>test</scope> </dependency>
JUnitとAssertJは、テストコード用です。
テストコードの雛形
まずは、テストコードの大枠だけを書いてみます。
import文などを含めて、こんな感じです。
src/test/java/org/littlewings/rabbitmq/rpc/RpcTest.java
package org.littlewings.rabbitmq.rpc; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class RpcTest { // ここに、テストを書く! }
Client側
では、まずはClient側のコードを書いてみます。
public String client(String message) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String replyQueueName = channel.queueDeclare().getQueue(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); String uuid = UUID.randomUUID().toString(); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .correlationId(uuid) .replyTo(replyQueueName) .build(); channel.basicPublish("", "rpc_queue", properties, message.getBytes(StandardCharsets.UTF_8)); String receivedMessage; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(uuid)) { receivedMessage = new String(delivery.getBody(), StandardCharsets.UTF_8); } else { receivedMessage = null; } TimeUnit.SECONDS.sleep(5L); channel.close(); connection.close(); return receivedMessage; }
このメソッドは、最終的にはServer側から戻ってきたメッセージを返しています。
キューは簡易的に宣言していますが、ここではQueueingConsumerというクラスが登場します。
String replyQueueName = channel.queueDeclare().getQueue(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer);
このクラスをChannel#basicConsumeで登録しますが、これまではConsumerのサブクラスを作成していましたが、今回はこのQueueingConsumerを直接登録します。
また、Channel#queueDeclare#getQueueの結果をリプライ受信用のキュー名として作成し、このキューに対してQueueingConsumerを紐づけます。
次に、UUIDを作成してBasicPropertiesを組み立て、Channel#basicPublishでメッセージを送信します。
String uuid = UUID.randomUUID().toString(); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .correlationId(uuid) .replyTo(replyQueueName) .build(); channel.basicPublish("", "rpc_queue", properties, message.getBytes(StandardCharsets.UTF_8));
この時、先ほど作成したリプライ用のキューの名前をAMQP.BasicProperties.Builder#replyToに設定しています。メッセージの送信自体は、Channel#basicPublishで「rpc_queue」という名前のキューに対して行います。
要するに、送信用(rpc_queue)と受信用(replyQueueName)でキューを分けようということですね。
UUIDはなにに使っているかというと、Correlation Idと呼ばれるものとして扱っています。照合用のIDです。
なにを照合するのかというと、自分が送ったリクエストに対するレスポンスなのか、判断するためのIDを付けようということです。これを、AMQP.BasicProperties.Builder#correlationIdで設定します。
Client側は、この後にQueueingConsumer#nextDeliveryで受信したメッセージの中のCorrelation Idを自分が送ったものと同じかどうか確認します。Correlation Idが違えば、メッセージを無視します(というように実装しています)。
String receivedMessage; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(uuid)) { receivedMessage = new String(delivery.getBody(), StandardCharsets.UTF_8); } else { receivedMessage = null; }
QueueingConsumer#nextDeliveryは、内部的にBlockingQueueで実装されているので、メッセージが届くまではブロックされます。
Server側
続いて、Server側です。
public void server() throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("kazuhira"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("rpc_queue", false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("rpc_queue", false, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); AMQP.BasicProperties properties = delivery.getProperties(); AMQP.BasicProperties replyProperties = new AMQP.BasicProperties.Builder() .correlationId(properties.getCorrelationId()) .build(); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); String replyMessage = "★" + message + "★"; channel.basicPublish("", properties.getReplyTo(), replyProperties, replyMessage.getBytes(StandardCharsets.UTF_8)); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); }
こちらは、「rpc_queue」に対してQueueingConsumerを紐づけます。
channel.queueDeclare("rpc_queue", false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("rpc_queue", false, consumer);
これで、Client側からのメッセージを取得できるまで、待機します。
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
メッセージが届いたら、QueueingConsumer.DeliveryよりBasicPropertiesを取得し、BasicPropertiesからクライアントが送信してきたCorrelation Idを取得してServer側でもBasicPropertiesを組み立てます。
AMQP.BasicProperties properties = delivery.getProperties();
AMQP.BasicProperties replyProperties =
new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build();
そしてリプライ用のメッセージを組み立て(ここでは「★」を付けることにしました)、Client側が送信する時にReplyToに設定したQueueにChannel#basicPublishで送り返します。
String message = new String(delivery.getBody(), StandardCharsets.UTF_8); String replyMessage = "★" + message + "★"; channel.basicPublish("", properties.getReplyTo(), replyProperties, replyMessage.getBytes(StandardCharsets.UTF_8)); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
Client側は、ここで送り返されたメッセージを、送信時とは別のキューから読み取るというわけですね。
動かしてみる
最後は、テストコードです。
Server側を起動し、Client側がメッセージを送信、返却されたメッセージが送ったメッセージに「★」を加えたものであることが確認できます。
@Test public void rpcCall() throws InterruptedException, IOException, TimeoutException { String message = "Hello World"; ExecutorService es = Executors.newSingleThreadExecutor(); es.submit(() -> { try { server(); } catch (IOException | TimeoutException | InterruptedException e) { throw new RuntimeException(e); } }); TimeUnit.SECONDS.sleep(2L); String replyMessage = client(message); assertThat(replyMessage) .isEqualTo("★Hello World★"); }
動かせた感じですね。