これは、なにをしたくて書いたもの?
という感じで、単にRedisのPub/Subをどんな感じのものか試してみるためのエントリです。
Redis Pub/Sub
Redis上のドキュメントは、こちら。
SUBSCRIBEコマンドで購読登録、UNSUBSCRIBEで購読解除、PUBLISHでメッセージ配信、というのが基本的な
コマンドのようです。
メッセージングを実現する方法のひとつとして、覚えておくとよさそうですね?
注意点としては、メッセージをPUBLISHした時にそれを受け取るSubscriberがいないとそのメッセージはなくなること、
SUBSCRIBEしているクライアントは、他のコマンドを発行できない(するべきではない)ということの模様。
まあ、まずは試してみましょう。
環境
今回の環境は、こちら。
Redisのバージョン。
9:C 07 Oct 13:07:11.264 # Configuration loaded _._ _.-``__ ''-._ _.-`` `. `_. ''-._ Redis 4.0.11 (00000000/0) 64 bit .-`` .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.``-._|'` _.-'| Port: 6379 | `-._ `._ / _.-' | PID: 9 `-._ `-._ `-./ _.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | http://redis.io `-._ `-._`-.__.-'_.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | `-._ `-._`-.__.-'_.-' _.-' `-._ `-.__.-' _.-' `-._ _.-' `-.__.-'
Redis 4.0.11。リモート接続で、IPアドレスは172.17.0.2とします。パスワードは「redispass」で。
Java。
$ java -version openjdk version "1.8.0_181" OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-0ubuntu0.18.04.1-b13) OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode) $ mvn -version Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T03:33:14+09:00) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-36-generic", arch: "amd64", family: "unix"
準備
Maven依存関係は、こちら。Lettuceを使うだけです。
<dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>5.1.0.RELEASE</version> </dependency>
ここまでで、準備は完了。それでは、ソースコードを書いていきましょう。
Subscriberを書く
最初は、Subscriberから書いていきましょう。
ドキュメントは、こちらを参照。
Publish/Subscribe / Reactive API
Publisher側のAPIの方は、書いてない気がしますが…。
あとは、テストコードを参考に。
それで、作成したシンプルなSubscriberがこちら。
src/main/java/org/littlewings/reactor/redis/RedisSubscriber.java
package org.littlewings.reactor.redis; import java.util.logging.Logger; import io.lettuce.core.RedisClient; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; import reactor.core.Disposable; public class RedisSubscriber { Logger logger = Logger.getLogger(getClass().getName()); public static void main(String... args) { RedisSubscriber subscriber = new RedisSubscriber(); subscriber.subscribe(); } public void subscribe() { RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0"); try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) { RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive(); reactiveCommands.subscribe("channel").block(); Disposable disposable = reactiveCommands .observeChannels() .doOnNext(channelMessage -> logger.info(String.format( "[%s] received message = %s", channelMessage.getChannel(), channelMessage.getMessage())) ) .subscribe(); logger.info("start subscribe"); System.console().readLine("> Enter stop."); reactiveCommands.unsubscribe("channel"); disposable.dispose(); } finally { redisClient.shutdown(); } } }
ポイントは、Pub/Sub用のConnectionとCommandsを取得すること、
try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) {
RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive();
subscribeするチャネル名を登録すること、
reactiveCommands.subscribe("channel").block();
subscribeの解除あたりですね。
reactiveCommands.unsubscribe("channel");
subscribeしたメッセージを処理しているところは、こちら。
Disposable disposable = reactiveCommands .observeChannels() .doOnNext(channelMessage -> logger.info(String.format( "[%s] received message = %s", channelMessage.getChannel(), channelMessage.getMessage())) ) .subscribe();
ドキュメントやテストでサンプルを見ると、doOnNextでメッセージを処理する感じみたいです。
メッセージを処理しているところは、ほぼReactorの世界ですね。
起動後にEnterを打つと、終了するようにしてあります。
System.console().readLine("> Enter stop.");
subscribeの解除は、この後に。
reactiveCommands.unsubscribe("channel");
Publisher
続いて、Publisher側。
src/main/java/org/littlewings/reactor/redis/RedisPublisher.java
package org.littlewings.reactor.redis; import java.time.Duration; import io.lettuce.core.RedisClient; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public class RedisPublisher { public static void main(String... args) { RedisPublisher publisher = new RedisPublisher(); publisher.publish(); } public void publish() { RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0"); try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) { RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive(); Disposable disposable = Flux .interval(Duration.ofSeconds(1L)) .subscribe(count -> reactiveCommands .publish("channel", "message-" + count) .subscribe() ); System.console().readLine("> Enter stop."); disposable.dispose(); Schedulers.shutdownNow(); } finally { redisClient.shutdown(); } } }
こちらは、1秒おきにメッセージを送信するPublisherとしました。
メッセージの送信自体は、RedisPubSubReactiveCommands#publishで行いますが、subscribeしないと処理が始まらないので
注意しましょう。
Disposable disposable = Flux .interval(Duration.ofSeconds(1L)) .subscribe(count -> reactiveCommands .publish("channel", "message-" + count) .subscribe() );
こちらもEnterを打ったら終了するようにしているのですが、スレッドが待ち続けてしまうので、今回はSchedulersを
shutdownしておきました。
System.console().readLine("> Enter stop.");
disposable.dispose();
Schedulers.shutdownNow();
動かしてみる
では、ここで作成したPublisherおよびSubscriberを動かしてみましょう。
Subscriberは、複数動かすことにします。
まずは、Subscriberを2つ、Publisherを起動。
## Subscriber 1 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisSubscriber ## Subscriber 2 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisSubscriber ## Publisher $ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisPublisher
2つのSubscriberそれぞれに、メッセージが出力されていきます。
## Subscriber 1 情報: [channel] received message = message-0 10 08, 2018 12:11:38 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-1 10 08, 2018 12:11:39 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-2 10 08, 2018 12:11:40 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-3 10 08, 2018 12:11:41 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-4 ## Subscriber 2 情報: [channel] received message = message-0 10 08, 2018 12:11:38 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-1 10 08, 2018 12:11:39 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-2 10 08, 2018 12:11:40 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-3 10 08, 2018 12:11:41 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-4
ひとつのチャネルに複数のSubscriberを付けても、それぞれメッセージを読み込める感じですね。
ここで、もうひとつSubscriberを追加します。
## Subscriber 3 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisSubscriber
追加されたSubscriberは、起動後に送信されたメッセージをsubscribeしていきます。
## Subscriber 3 情報: [channel] received message = message-83 10 08, 2018 12:13:01 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-84 10 08, 2018 12:13:02 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-85 10 08, 2018 12:13:03 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-86 10 08, 2018 12:13:04 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0 情報: [channel] received message = message-87
途中でSubscriberを停止して、再度起動したりしても、subscribeを始めた後にやはり読み出します。
これで、単純な使い方はわかった感じですね。
複数のチャネルからメッセージを読む
Subscriberは、チャネルを複数subscribeすることができます。
例。
src/main/java/org/littlewings/reactor/redis/RedisMultiSubscriber.java
package org.littlewings.reactor.redis; import java.util.logging.Logger; import io.lettuce.core.RedisClient; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; import reactor.core.Disposable; public class RedisMultiSubscriber { Logger logger = Logger.getLogger(getClass().getName()); public static void main(String... args) { RedisMultiSubscriber subscriber = new RedisMultiSubscriber(); subscriber.subscribe(); } public void subscribe() { RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0"); try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) { RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive(); reactiveCommands.subscribe("channel-1", "channel-2", "my-channel").block(); Disposable disposable = reactiveCommands .observeChannels() .doOnNext(channelMessage -> logger.info(String.format( "[%s] received message = %s", channelMessage.getChannel(), channelMessage.getMessage())) ) .subscribe(); logger.info("start subscribe"); System.console().readLine("> Enter stop."); reactiveCommands.unsubscribe("channel-1", "channel-2", "my-channel"); disposable.dispose(); } finally { redisClient.shutdown(); } } }
先程のサンプルと違うのは、RedisPubSubReactiveCommands#subscribeで、複数のチャネルを指定しているところ。
可変長引数になっているので、複数個の指定が可能です。今回は、3つのチャネルからsubscribeしてみましょう。
reactiveCommands.subscribe("channel-1", "channel-2", "my-channel").block();
それぞれ、「"channel-1」、「channel-2」、「my-channel」です。
unsubscribeも同様に、複数個の指定ができます。
reactiveCommands.unsubscribe("channel-1", "channel-2", "my-channel");
Publisherの方は、単純に複数のチャネルに対するRedisPubSubReactiveCommands#publishを書くだけです。
src/main/java/org/littlewings/reactor/redis/RedisMultiPublisher.java
package org.littlewings.reactor.redis; import java.time.Duration; import io.lettuce.core.RedisClient; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public class RedisMultiPublisher { public static void main(String... args) { RedisMultiPublisher publisher = new RedisMultiPublisher(); publisher.publish(); } public void publish() { RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0"); try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) { RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive(); Disposable disposable = Flux .interval(Duration.ofSeconds(1L)) .subscribe(count -> reactiveCommands .publish("channel-1", "message-" + count) .subscribe() ); Disposable disposable2 = Flux .interval(Duration.ofSeconds(3L)) .subscribe(count -> reactiveCommands .publish("channel-2", "message-" + (count + 10000)) .subscribe() ); Disposable disposable3 = Flux .interval(Duration.ofSeconds(2L)) .subscribe(count -> reactiveCommands .publish("my-channel", "message-" + (count * 15)) .subscribe() ); System.console().readLine("> Enter stop."); disposable.dispose(); disposable2.dispose(); disposable3.dispose(); Schedulers.shutdownNow(); } finally { redisClient.shutdown(); } } }
1秒おき、3秒おき、2秒おきにそれぞれメッセージを送信しています。
Disposable disposable = Flux .interval(Duration.ofSeconds(1L)) .subscribe(count -> reactiveCommands .publish("channel-1", "message-" + count) .subscribe() ); Disposable disposable2 = Flux .interval(Duration.ofSeconds(3L)) .subscribe(count -> reactiveCommands .publish("channel-2", "message-" + (count + 10000)) .subscribe() ); Disposable disposable3 = Flux .interval(Duration.ofSeconds(2L)) .subscribe(count -> reactiveCommands .publish("my-channel", "message-" + (count * 15)) .subscribe() );
確認。今回は、複数のSubscriberによる確認は省略します(最初の例と同じように、複数のSubscriberがそれぞれ同じメッセージを
読むことができます)。
## Subscriber $ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisMultiSubscriber ## Publisher $ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisMultiPublisher
結果。各チャネルのメッセージを受信できていることがわかります。
情報: [channel-1] received message = message-0 10 08, 2018 12:21:05 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [channel-1] received message = message-1 10 08, 2018 12:21:05 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [my-channel] received message = message-0 10 08, 2018 12:21:06 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [channel-1] received message = message-2 10 08, 2018 12:21:06 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [channel-2] received message = message-10000 10 08, 2018 12:21:07 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [channel-1] received message = message-3 10 08, 2018 12:21:07 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [my-channel] received message = message-15 10 08, 2018 12:21:08 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [channel-1] received message = message-4 10 08, 2018 12:21:09 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [channel-1] received message = message-5 10 08, 2018 12:21:09 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [channel-2] received message = message-10001 10 08, 2018 12:21:09 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0 情報: [my-channel] received message = message-30
パターンでSubscribeするチャネルを指定する
最後は、パターンでSubscribeするチャネルを指定してみます。
ドキュメントでいくと、PSUBSCRIBEコマンドです。
Subscribe解除は、PUNSUBSCRIBEコマンド。
PSUBSCRIBEコマンドは、globスタイルのパターンをサポートしています。
- ? … 任意の1文字
- * … 0文字以上の任意の文字列にマッチ
- [ ] … 括弧内で指定された1文字にマッチ
エスケープは、「\」で。
サンプルでいくと、こんな感じです。
h?llo subscribes to hello, hallo and hxllo h*llo subscribes to hllo and heeeello h[ae]llo subscribes to hello and hallo, but not hillo
[ ]が範囲までできるかとかは、確認していません…。
src/main/java/org/littlewings/reactor/redis/RedisPatternSubscriber.java
package org.littlewings.reactor.redis; import java.util.logging.Logger; import io.lettuce.core.RedisClient; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; import reactor.core.Disposable; public class RedisPatternSubscriber { Logger logger = Logger.getLogger(getClass().getName()); public static void main(String... args) { RedisPatternSubscriber subscriber = new RedisPatternSubscriber(); subscriber.subscribe(); } public void subscribe() { RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0"); try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) { RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive(); reactiveCommands.psubscribe("channel-*").block(); Disposable disposable = reactiveCommands .observePatterns() .doOnNext(patternMessage -> logger.info(String.format( "[%s] [%s] received message = %s", patternMessage.getPattern(), patternMessage.getChannel(), patternMessage.getMessage())) ) .subscribe(); logger.info("start subscribe"); System.console().readLine("> Enter stop."); reactiveCommands.punsubscribe("channel-*"); disposable.dispose(); } finally { redisClient.shutdown(); } } }
RedisPubSubReactiveCommands#subscribe、unsubscribeを、RedisPubSubReactiveCommands#psubscribe、punsubscribeに
変えた感じですね。
reactiveCommands.psubscribe("channel-*").block(); reactiveCommands.punsubscribe("channel-*");
psubscribeを使うと、subscribeするオブジェクトの型が変わり、Subscribeしているパターンも取得することができます。
.doOnNext(patternMessage -> logger.info(String.format( "[%s] [%s] received message = %s", patternMessage.getPattern(), patternMessage.getChannel(), patternMessage.getMessage())) )
Publisherの方は、先程の複数のチャネルに対してメッセージを送信していたものをそのまま使いましょう。
Disposable disposable = Flux .interval(Duration.ofSeconds(1L)) .subscribe(count -> reactiveCommands .publish("channel-1", "message-" + count) .subscribe() ); Disposable disposable2 = Flux .interval(Duration.ofSeconds(3L)) .subscribe(count -> reactiveCommands .publish("channel-2", "message-" + (count + 10000)) .subscribe() ); Disposable disposable3 = Flux .interval(Duration.ofSeconds(2L)) .subscribe(count -> reactiveCommands .publish("my-channel", "message-" + (count * 15)) .subscribe() );
「channel-1」と「channel-2」の2つのチャネルだけから、メッセージが読まれるはずですね。
確認。
## Subscriber $ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisPatternSubscriber ## Producer $ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisMultiPublisher
出力されるメッセージは、「channel-1」と「channel-2」に絞られていることが確認できました。
情報: [channel-*] [channel-1] received message = message-0 10 08, 2018 12:44:28 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0 情報: [channel-*] [channel-1] received message = message-1 10 08, 2018 12:44:29 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0 情報: [channel-*] [channel-1] received message = message-2 10 08, 2018 12:44:29 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0 情報: [channel-*] [channel-2] received message = message-10000 10 08, 2018 12:44:30 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0 情報: [channel-*] [channel-1] received message = message-3 10 08, 2018 12:44:31 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0 情報: [channel-*] [channel-1] received message = message-4 10 08, 2018 12:44:32 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0 情報: [channel-*] [channel-1] received message = message-5 10 08, 2018 12:44:32 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0 情報: [channel-*] [channel-2] received message = message-10001
まとめ
RedisのPub/Subを、Lettuceを使って確認してみました。
試している時に、最初Publisherをうまく動かせなくてハマったりしていたのですが、ちゃんとsubscribeメソッドを
実行していなかったとかのミスだったり…。
確認したい内容はだいたい見れたので、よしとしましょうー。