CLOVER🍀

That was when it all began.

RedisのPub/SubをLettuce(Reactive)で試す

これは、なにをしたくて書いたもの?

  • RedisのPub/Subの機能をちょっと試してみたくて
  • クライアントは、LettuceでReactive APIで書いてみよう

という感じで、単にRedisのPub/Subをどんな感じのものか試してみるためのエントリです。

Redis Pub/Sub

Redis上のドキュメントは、こちら。

Pub/Sub – Redis

SUBSCRIBEコマンドで購読登録、UNSUBSCRIBEで購読解除、PUBLISHでメッセージ配信、というのが基本的な
コマンドのようです。

SUBSCRIBE – Redis

UNSUBSCRIBE – Redis

PUBLISH – Redis

メッセージングを実現する方法のひとつとして、覚えておくとよさそうですね?

分散型メッセージングミドルウェアの詳細比較 | POSTD

Redisのpub/sub機能

注意点としては、メッセージをPUBLISHした時にそれを受け取るSubscriberがいないとそのメッセージはなくなること、
SUBSCRIBEしているクライアントは、他のコマンドを発行できない(するべきではない)ということの模様。

まあ、まずは試してみましょう。

環境

今回の環境は、こちら。

Redisのバージョン。

9:C 07 Oct 13:07:11.264 # Configuration loaded
                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 4.0.11 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._                                   
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 9
  `-._    `-._  `-./  _.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |           http://redis.io        
  `-._    `-._`-.__.-'_.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |                                  
  `-._    `-._`-.__.-'_.-'    _.-'                                   
      `-._    `-.__.-'    _.-'                                       
          `-._        _.-'                                           
              `-.__.-'                                               

Redis 4.0.11。リモート接続で、IPアドレスは172.17.0.2とします。パスワードは「redispass」で。

Java

$ java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-0ubuntu0.18.04.1-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)


$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T03:33:14+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-36-generic", arch: "amd64", family: "unix"

準備

Maven依存関係は、こちら。Lettuceを使うだけです。

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.1.0.RELEASE</version>
        </dependency>

ここまでで、準備は完了。それでは、ソースコードを書いていきましょう。

Subscriberを書く

最初は、Subscriberから書いていきましょう。

ドキュメントは、こちらを参照。

Publish/Subscribe / Reactive API

Publisher側のAPIの方は、書いてない気がしますが…。

あとは、テストコードを参考に。

https://github.com/lettuce-io/lettuce-core/blob/5.1.0.RELEASE/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java

それで、作成したシンプルなSubscriberがこちら。
src/main/java/org/littlewings/reactor/redis/RedisSubscriber.java

package org.littlewings.reactor.redis;

import java.util.logging.Logger;

import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import reactor.core.Disposable;

public class RedisSubscriber {
    Logger logger = Logger.getLogger(getClass().getName());

    public static void main(String... args) {
        RedisSubscriber subscriber = new RedisSubscriber();
        subscriber.subscribe();
    }

    public void subscribe() {
        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

        try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) {
            RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive();

            reactiveCommands.subscribe("channel").block();

            Disposable disposable =
                    reactiveCommands
                            .observeChannels()
                            .doOnNext(channelMessage ->
                                    logger.info(String.format(
                                            "[%s] received message = %s",
                                            channelMessage.getChannel(),
                                            channelMessage.getMessage()))
                            )
                            .subscribe();

            logger.info("start subscribe");

            System.console().readLine("> Enter stop.");

            reactiveCommands.unsubscribe("channel");
            disposable.dispose();
        } finally {
            redisClient.shutdown();
        }
    }
}

ポイントは、Pub/Sub用のConnectionとCommandsを取得すること、

        try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) {
            RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive();

subscribeするチャネル名を登録すること、

            reactiveCommands.subscribe("channel").block();

subscribeの解除あたりですね。

            reactiveCommands.unsubscribe("channel");

subscribeしたメッセージを処理しているところは、こちら。

            Disposable disposable =
                    reactiveCommands
                            .observeChannels()
                            .doOnNext(channelMessage ->
                                    logger.info(String.format(
                                            "[%s] received message = %s",
                                            channelMessage.getChannel(),
                                            channelMessage.getMessage()))
                            )
                            .subscribe();

ドキュメントやテストでサンプルを見ると、doOnNextでメッセージを処理する感じみたいです。

メッセージを処理しているところは、ほぼReactorの世界ですね。

起動後にEnterを打つと、終了するようにしてあります。

            System.console().readLine("> Enter stop.");

subscribeの解除は、この後に。

            reactiveCommands.unsubscribe("channel");

Publisher

続いて、Publisher側。
src/main/java/org/littlewings/reactor/redis/RedisPublisher.java

package org.littlewings.reactor.redis;

import java.time.Duration;

import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class RedisPublisher {
    public static void main(String... args) {
        RedisPublisher publisher = new RedisPublisher();
        publisher.publish();
    }

    public void publish() {
        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

        try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) {
            RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive();

            Disposable disposable =
                    Flux
                            .interval(Duration.ofSeconds(1L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("channel", "message-" + count)
                                            .subscribe()
                            );

            System.console().readLine("> Enter stop.");

            disposable.dispose();
            Schedulers.shutdownNow();
        } finally {
            redisClient.shutdown();
        }
    }
}

こちらは、1秒おきにメッセージを送信するPublisherとしました。

メッセージの送信自体は、RedisPubSubReactiveCommands#publishで行いますが、subscribeしないと処理が始まらないので
注意しましょう。

            Disposable disposable =
                    Flux
                            .interval(Duration.ofSeconds(1L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("channel", "message-" + count)
                                            .subscribe()
                            );

こちらもEnterを打ったら終了するようにしているのですが、スレッドが待ち続けてしまうので、今回はSchedulersを
shutdownしておきました。

            System.console().readLine("> Enter stop.");

            disposable.dispose();
            Schedulers.shutdownNow();

動かしてみる

では、ここで作成したPublisherおよびSubscriberを動かしてみましょう。

Subscriberは、複数動かすことにします。

まずは、Subscriberを2つ、Publisherを起動。

## Subscriber 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisSubscriber

## Subscriber 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisSubscriber


## Publisher
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisPublisher

2つのSubscriberそれぞれに、メッセージが出力されていきます。

## Subscriber 1
情報: [channel] received message = message-0
10 08, 2018 12:11:38 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-1
10 08, 2018 12:11:39 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-2
10 08, 2018 12:11:40 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-3
10 08, 2018 12:11:41 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-4


## Subscriber 2
情報: [channel] received message = message-0
10 08, 2018 12:11:38 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-1
10 08, 2018 12:11:39 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-2
10 08, 2018 12:11:40 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-3
10 08, 2018 12:11:41 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-4

ひとつのチャネルに複数のSubscriberを付けても、それぞれメッセージを読み込める感じですね。

ここで、もうひとつSubscriberを追加します。

## Subscriber 3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisSubscriber

追加されたSubscriberは、起動後に送信されたメッセージをsubscribeしていきます。

## Subscriber 3
情報: [channel] received message = message-83
10 08, 2018 12:13:01 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-84
10 08, 2018 12:13:02 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-85
10 08, 2018 12:13:03 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-86
10 08, 2018 12:13:04 午前 org.littlewings.reactor.redis.RedisSubscriber lambda$subscribe$0
情報: [channel] received message = message-87

途中でSubscriberを停止して、再度起動したりしても、subscribeを始めた後にやはり読み出します。

これで、単純な使い方はわかった感じですね。

複数のチャネルからメッセージを読む

Subscriberは、チャネルを複数subscribeすることができます。

例。
src/main/java/org/littlewings/reactor/redis/RedisMultiSubscriber.java

package org.littlewings.reactor.redis;

import java.util.logging.Logger;

import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import reactor.core.Disposable;

public class RedisMultiSubscriber {
    Logger logger = Logger.getLogger(getClass().getName());

    public static void main(String... args) {
        RedisMultiSubscriber subscriber = new RedisMultiSubscriber();
        subscriber.subscribe();
    }

    public void subscribe() {
        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

        try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) {
            RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive();

            reactiveCommands.subscribe("channel-1", "channel-2", "my-channel").block();

            Disposable disposable =
                    reactiveCommands
                            .observeChannels()
                            .doOnNext(channelMessage ->
                                    logger.info(String.format(
                                            "[%s] received message = %s",
                                            channelMessage.getChannel(),
                                            channelMessage.getMessage()))
                            )
                            .subscribe();

            logger.info("start subscribe");

            System.console().readLine("> Enter stop.");

            reactiveCommands.unsubscribe("channel-1", "channel-2", "my-channel");
            disposable.dispose();
        } finally {
            redisClient.shutdown();
        }
    }
}

先程のサンプルと違うのは、RedisPubSubReactiveCommands#subscribeで、複数のチャネルを指定しているところ。
可変長引数になっているので、複数個の指定が可能です。今回は、3つのチャネルからsubscribeしてみましょう。

            reactiveCommands.subscribe("channel-1", "channel-2", "my-channel").block();

それぞれ、「"channel-1」、「channel-2」、「my-channel」です。

unsubscribeも同様に、複数個の指定ができます。

            reactiveCommands.unsubscribe("channel-1", "channel-2", "my-channel");

Publisherの方は、単純に複数のチャネルに対するRedisPubSubReactiveCommands#publishを書くだけです。
src/main/java/org/littlewings/reactor/redis/RedisMultiPublisher.java

package org.littlewings.reactor.redis;

import java.time.Duration;

import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class RedisMultiPublisher {
    public static void main(String... args) {
        RedisMultiPublisher publisher = new RedisMultiPublisher();
        publisher.publish();
    }

    public void publish() {
        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

        try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) {
            RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive();

            Disposable disposable =
                    Flux
                            .interval(Duration.ofSeconds(1L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("channel-1", "message-" + count)
                                            .subscribe()
                            );

            Disposable disposable2 =
                    Flux
                            .interval(Duration.ofSeconds(3L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("channel-2", "message-" + (count + 10000))
                                            .subscribe()
                            );

            Disposable disposable3 =
                    Flux
                            .interval(Duration.ofSeconds(2L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("my-channel", "message-" + (count * 15))
                                            .subscribe()
                            );

            System.console().readLine("> Enter stop.");

            disposable.dispose();
            disposable2.dispose();
            disposable3.dispose();

            Schedulers.shutdownNow();
        } finally {
            redisClient.shutdown();
        }
    }
}

1秒おき、3秒おき、2秒おきにそれぞれメッセージを送信しています。

            Disposable disposable =
                    Flux
                            .interval(Duration.ofSeconds(1L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("channel-1", "message-" + count)
                                            .subscribe()
                            );

            Disposable disposable2 =
                    Flux
                            .interval(Duration.ofSeconds(3L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("channel-2", "message-" + (count + 10000))
                                            .subscribe()
                            );

            Disposable disposable3 =
                    Flux
                            .interval(Duration.ofSeconds(2L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("my-channel", "message-" + (count * 15))
                                            .subscribe()
                            );

確認。今回は、複数のSubscriberによる確認は省略します(最初の例と同じように、複数のSubscriberがそれぞれ同じメッセージを
読むことができます)。

## Subscriber
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisMultiSubscriber

## Publisher
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisMultiPublisher

結果。各チャネルのメッセージを受信できていることがわかります。

情報: [channel-1] received message = message-0
10 08, 2018 12:21:05 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [channel-1] received message = message-1
10 08, 2018 12:21:05 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [my-channel] received message = message-0
10 08, 2018 12:21:06 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [channel-1] received message = message-2
10 08, 2018 12:21:06 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [channel-2] received message = message-10000
10 08, 2018 12:21:07 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [channel-1] received message = message-3
10 08, 2018 12:21:07 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [my-channel] received message = message-15
10 08, 2018 12:21:08 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [channel-1] received message = message-4
10 08, 2018 12:21:09 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [channel-1] received message = message-5
10 08, 2018 12:21:09 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [channel-2] received message = message-10001
10 08, 2018 12:21:09 午前 org.littlewings.reactor.redis.RedisMultiSubscriber lambda$subscribe$0
情報: [my-channel] received message = message-30

パターンでSubscribeするチャネルを指定する

最後は、パターンでSubscribeするチャネルを指定してみます。

ドキュメントでいくと、PSUBSCRIBEコマンドです。

PSUBSCRIBE – Redis

Subscribe解除は、PUNSUBSCRIBEコマンド。

PUNSUBSCRIBE – Redis

PSUBSCRIBEコマンドは、globスタイルのパターンをサポートしています。

  • ? … 任意の1文字
  • * … 0文字以上の任意の文字列にマッチ
  • [ ] … 括弧内で指定された1文字にマッチ

エスケープは、「\」で。

サンプルでいくと、こんな感じです。

h?llo subscribes to hello, hallo and hxllo h*llo subscribes to hllo and heeeello h[ae]llo subscribes to hello and hallo, but not hillo

[ ]が範囲までできるかとかは、確認していません…。

src/main/java/org/littlewings/reactor/redis/RedisPatternSubscriber.java

package org.littlewings.reactor.redis;

import java.util.logging.Logger;

import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import reactor.core.Disposable;

public class RedisPatternSubscriber {
    Logger logger = Logger.getLogger(getClass().getName());

    public static void main(String... args) {
        RedisPatternSubscriber subscriber = new RedisPatternSubscriber();
        subscriber.subscribe();
    }

    public void subscribe() {
        RedisClient redisClient = RedisClient.create("redis://redispass@172.17.0.2:6379/0");

        try (StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub()) {
            RedisPubSubReactiveCommands<String, String> reactiveCommands = connection.reactive();

            reactiveCommands.psubscribe("channel-*").block();

            Disposable disposable =
                    reactiveCommands
                            .observePatterns()
                            .doOnNext(patternMessage ->
                                    logger.info(String.format(
                                            "[%s] [%s] received message = %s",
                                            patternMessage.getPattern(),
                                            patternMessage.getChannel(),
                                            patternMessage.getMessage()))
                            )
                            .subscribe();

            logger.info("start subscribe");

            System.console().readLine("> Enter stop.");

            reactiveCommands.punsubscribe("channel-*");
            disposable.dispose();
        } finally {
            redisClient.shutdown();
        }
    }
}

RedisPubSubReactiveCommands#subscribe、unsubscribeを、RedisPubSubReactiveCommands#psubscribe、punsubscribeに
変えた感じですね。

            reactiveCommands.psubscribe("channel-*").block();

            reactiveCommands.punsubscribe("channel-*");

psubscribeを使うと、subscribeするオブジェクトの型が変わり、Subscribeしているパターンも取得することができます。

                            .doOnNext(patternMessage ->
                                    logger.info(String.format(
                                            "[%s] [%s] received message = %s",
                                            patternMessage.getPattern(),
                                            patternMessage.getChannel(),
                                            patternMessage.getMessage()))
                            )

Publisherの方は、先程の複数のチャネルに対してメッセージを送信していたものをそのまま使いましょう。

            Disposable disposable =
                    Flux
                            .interval(Duration.ofSeconds(1L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("channel-1", "message-" + count)
                                            .subscribe()
                            );

            Disposable disposable2 =
                    Flux
                            .interval(Duration.ofSeconds(3L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("channel-2", "message-" + (count + 10000))
                                            .subscribe()
                            );

            Disposable disposable3 =
                    Flux
                            .interval(Duration.ofSeconds(2L))
                            .subscribe(count ->
                                    reactiveCommands
                                            .publish("my-channel", "message-" + (count * 15))
                                            .subscribe()
                            );

「channel-1」と「channel-2」の2つのチャネルだけから、メッセージが読まれるはずですね。

確認。

## Subscriber
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisPatternSubscriber


## Producer
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.redis.RedisMultiPublisher

出力されるメッセージは、「channel-1」と「channel-2」に絞られていることが確認できました。

情報: [channel-*] [channel-1] received message = message-0
10 08, 2018 12:44:28 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0
情報: [channel-*] [channel-1] received message = message-1
10 08, 2018 12:44:29 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0
情報: [channel-*] [channel-1] received message = message-2
10 08, 2018 12:44:29 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0
情報: [channel-*] [channel-2] received message = message-10000
10 08, 2018 12:44:30 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0
情報: [channel-*] [channel-1] received message = message-3
10 08, 2018 12:44:31 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0
情報: [channel-*] [channel-1] received message = message-4
10 08, 2018 12:44:32 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0
情報: [channel-*] [channel-1] received message = message-5
10 08, 2018 12:44:32 午前 org.littlewings.reactor.redis.RedisPatternSubscriber lambda$subscribe$0
情報: [channel-*] [channel-2] received message = message-10001

まとめ

RedisのPub/Subを、Lettuceを使って確認してみました。

試している時に、最初Publisherをうまく動かせなくてハマったりしていたのですが、ちゃんとsubscribeメソッドを
実行していなかったとかのミスだったり…。

確認したい内容はだいたい見れたので、よしとしましょうー。