CLOVER🍀

That was when it all began.

Spring Integrationを試してみる

これは、なにをしたくて書いたもの?

Spring Integrationを少し試しておきたいな、と思いまして。

Spring Integration

Spring Integrationは、EIP(Enterprise Integration Patterns)をSpringで実現するためのものです。

Extends the Spring programming model to support the well-known Enterprise Integration Patterns.

Spring Integration

宣言的なアダプターを使い、外部システムとSpringベースのアプリケーションを軽量なメッセージングで統合します。

Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.

ドキュメントは、こちら。

Spring Integration Reference Guide

こちらの資料が、Spring Integrationについてわかりやすくまとめられていると思います。

システム間連携を担うSpring Integrationのエンタープライズ開発での活用

Spring Integrationにおける、主な要素はこちらのページに書かれてあります。

Overview of Spring Integration Framework

まずはメインコンポーネント

Spring Integration Overview / Main Components

  • Message … メッセージそのものを指し、ヘッダーとペイロードで構成される
    • Message にインターフェースについての説明が記載
  • Message Channel … 各コンポーネントをつなぐパイプの役割で、Messageの送信側(Producer)と受信側(Consumer)の関係を決定する
    • 以下の2つの構成が可能
      • "point-to-point" … Message Channelに送信されたメッセージを受け取るConsumerはひとつ
      • "publish-subscribe" … Message Channelに送信されたメッセージを、すべてのConsumerにブロードキャストする
    • さらに、Consumerに対しては以下の2つのオプションを選択できる
      • pollable … バッファリング可能、能動的に受信(polling)
      • subscribable … バッファリング不可、そのままMessageを受け取る
  • Message Endpoint … アプリケーションコードをメッセージングフレームワークに接続する役割を持ち、アーキテクチャー内のフィルター役を担う
    • MVCパラダイムにおける、Controllerの役割に近い
    • 複数のEndpointの種類がある
      • Message Transformer … Messageの変換を行う
      • Message Filter … Messageを出力Channelに渡すかどうかを決定する。多くの場合、publish-subscribeと組み合わせる
      • Message Router … 次にMessageを送るChannelを決定する
      • Splitter … Messageを入力Channelから受け取り、Messageを複数のMessageに分割、それぞれを出力Channelに送る
      • Aggregator … 複数のMessageを統合してひとつのMessageにする。Splitterの逆。
      • Service Activator … サービスインスタンスをメッセージングシステムに接続するための、汎用のEndpoint
      • Channel Adapter … Message Channelを他のシステムまたはトランスポートに接続するEndpointであり、入力や出力を別システムとすることができる

プログラミングの際のポイントは、こちらを参照。

Spring Integration Overview / Programming Tips and Tricks

また、これらの主要なAPIについては以下のページに記載されています。

Core Messaging

眺めていてもよくわからないので、メモ的に書いていってみます…。

Message Channelについては、こちら。

Messaging Channels / Message Channels

  • MessageChannelインターフェース
    • サブインターフェース
      • PollableChannelインターフェース … Consumerがバッファリングできることを踏まえ、ポーリングが行えることを表すインターフェース
      • SubscribableChannelインターフェース … サブスクライブしているインスタンスに、メッセージを直接渡すことを表すインターフェース。ポーリングは行わない
    • 実装
      • PublishSubscribeChannelクラス … SubscribableChannelインターフェースの実装であり、Messageをサブスクライブしているハンドラーにブロードキャストする
      • QueueChannelクラス … PollableChannelインターフェースの実装であり、MessageをFIFOキューに格納し、ひとつのConsumerにメッセージを送信する
      • PriorityChannelクラス …QueueChannelクラスとほぼ同じだが、キューがMessageの優先順位によって制御される(PriorityQueue)
      • RendezvousChannelクラス … QueueChannelクラスに似ているが、ProducerがConsumerに直接Messageを渡すかのような動きになる(長さ0のキューに類似)
      • DirectChannelクラス … SubscribableChannelインターフェースの実装で、単一のハンドラーにMessageを渡す。この時、Messageの送受信側の両方は、単一のスレッドで動作する。Spring IntegrationのデフォルトのChannel
      • ExecutorChannelクラス … DirectChannelクラスと似ているが、TaskExecutorに処理がディスパッチされることが異なる点。メッセージの送受信側がそれぞれ別のスレッドになる可能性がある

Pollerでは、Spring Integrationにおいてどのようにポーリングが機能しているかについて書かれています。

Messaging Channels / Poller

Channel Adapterでは、Messageの送信側、受信側に関する記述、設定について書かれています。

Messaging Channels / Channel Adapter

Spring Integrationでデータベースやファイル等からデータを読み書き(Inbound/Outbound)する際には、このChannel Adapterを使用します。

Messaging Bridgeでは、2つのMessage ChannelまたはChannel Adapterを接続する方法について書かれています。

Messaging Channels / Messaging Bridge

Channel Adapter、Message Bridgeについては設定例が書かれているので、見るとイメージが湧きやすいかなと思います。

設定はXMLDSLで行うようですが、個人的にはできればDSLがいいかな、と思います。

Java DSL

Message Endpointについては、以下にさらに詳細にまとまっているのですが。

Messaging Endpoints

Messaging Gatewayという、リクエスト/レスポンスの形態を取れるEndpointもあるようです。

Spring IntegrationでサポートしているEndpointは、こちら。

  • AMQP
  • ApplicationEvents(Spring)
  • Feed(RSSATOM
  • File
  • FTP/FTPS
  • Pivotal Gemfire
  • HTTP
  • JDBC
  • JMS
  • JMX
  • JPA
  • Apache Cassandra
  • Mail
  • MongoDB
  • MQTT
  • R2DBC
  • Redis
  • Resource(Spring)
  • RMI
  • RSocket
  • SFTP
  • STOMP
  • Stream
  • Syslog
  • TCP
  • UDP
  • WebFlux
  • Web Services
  • WebSocket
  • XMPP
  • ZeroMQ

各EndpointがInbound Channel Adapter、Outbound Channel Adapter、Inbound Gateway、Outbound Gatewayのいずれを備えるかは、
以下にQuick Referenceがあります。

Integration Endpoints

その他、サンプルは以下のページやGitHubリポジトリにあります。

Getting Started | Integrating Data

Spring Integration

GitHub - spring-projects/spring-integration-samples: You are looking for examples, code snippets, sample applications for Spring Integration? This is the place.

だいぶ長くなりましたが、そろそろ実際に試してみましょう。

お題

今回のお題は、以下にしましょう。

  • Redisをサブスクライブしてメッセージを受信
  • メッセージはJSONとし、オブジェクトに変換
  • オブジェクトの内容を加工して、標準出力およびファイルに出力

使用するのは、以下ですね。

Redis Support

Stream Support

File Support

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.2 2022-01-18
OpenJDK Runtime Environment (build 17.0.2+8-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 17.0.2+8-Ubuntu-120.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.2, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-107-generic", arch: "amd64", family: "unix"

Redisは172.17.0.2で動作しているものとし、パスワードはredispassとします。

バージョンはこちら。

$ bin/redis-server --version
Redis server v=6.2.6 sha=00000000:0 malloc=jemalloc-5.1.0 bits=64 build=7c7087afbd1829d1

プロジェクトの作成

まずは、Spring Initializrを使ってプロジェクト作成します。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.6 \
  -d javaVersion=17 \
  -d name=spring-integration-example \
  -d groupId=org.littlewings \
  -d artifactId=spring-integration-example \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings \
  -d dependencies=web,integration,data-redis \
  -d baseDir=spring-integration-example | tar zxvf -

依存関係にはwebintegrationdata-redisを含めていますが、これだけではまだ足りません。

プロジェクト内へ移動。

$ cd spring-integration-example

この時点での依存関係等。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-data-redis</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-integration</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-web</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-http</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-redis</artifactId>
                </dependency>

                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

依存関係は、以下のように変更します。

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--
       <dependency>
           <groupId>org.springframework.integration</groupId>
           <artifactId>spring-integration-http</artifactId>
       </dependency>
       -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

spring-integration-httpは使わないので削除。spring-integration-redisspring-integration-streamspring-integration-fileを追加。

なお、Spring Data Redisはspring-integration-redisが使用するので必要です。spring-integration-redis自体の依存関係にも含まれて
いるのですが、まるっとspring-boot-starter-data-redisを使うことにしました。

最初から含まれているソースコードは削除。

$ rm src/main/java/org/littlewings/SpringIntegrationExampleApplication.java src/test/java/org/littlewings/SpringIntegrationExampleApplicationTests.java

Redisへの接続情報は、application.propertiesに記載しておきます。

src/main/resources/application.properties

spring.redis.host=172.17.0.2
spring.redis.port=6379
spring.redis.password=redispass

プログラムを作成する

では、プログラムを作成していきます。

まずは、特に変わったところのないmainクラス。

src/main/java/org/littlewings/spring/integration/App.java

package org.littlewings.spring.integration;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

メッセージはJSONで受け取ることとして、Javaのオブジェクトに変換するようにしましょう。

こんなクラスにしました。

src/main/java/org/littlewings/spring/integration/MyMessage.java

package org.littlewings.spring.integration;

public class MyMessage {
    String value;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

あとは、フローを組み立てていきます。

最終的には、こうなりました。

src/main/java/org/littlewings/spring/integration/MessagingConfig.java

package org.littlewings.spring.integration;

import java.io.File;
import java.io.PrintWriter;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.redis.inbound.RedisInboundChannelAdapter;
import org.springframework.integration.stream.CharacterStreamWritingMessageHandler;

@Configuration
public class MessagingConfig {
    @Bean
    public IntegrationFlow redisInputFlow(RedisConnectionFactory redisConnectionFactory) {
        RedisInboundChannelAdapter redisInboundChannelAdapter = new RedisInboundChannelAdapter(redisConnectionFactory);
        redisInboundChannelAdapter.setTopics("topic1", "topic2");

        return IntegrationFlows
                .from(redisInboundChannelAdapter)
                .transform(Transformers.fromJson(MyMessage.class))
                //.channel("redisInput")
                .channel(MessageChannels.publishSubscribe("redisInput"))
                .get();
    }

    @Bean
    public IntegrationFlow writeStdoutFlow() {
        CharacterStreamWritingMessageHandler handler = new CharacterStreamWritingMessageHandler(new PrintWriter(System.out));
        handler.appendNewLine(true);

        return IntegrationFlows
                .from("redisInput")
                .transform("'★★★' + payload.value.toUpperCase() + '★★★'")
                .handle(handler)
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        FileWritingMessageHandler handler =
                Files
                        .outboundAdapter(new File("target/file-outputs"))
                        .appendNewLine(true)
                        .get();

        return IntegrationFlows
                .from("redisInput")
                .transform("'message = ' + payload.value")
                .handle(handler)
                .get();
    }
}

なのですが、説明を兼ねて、最初はここからスタートしましょう。

@Configuration
public class MessagingConfig {

    // ここに@Bean定義を書く
}

最初に定義するのは、こちら。

    @Bean
    public IntegrationFlow redisInputFlow(RedisConnectionFactory redisConnectionFactory) {
        RedisInboundChannelAdapter redisInboundChannelAdapter = new RedisInboundChannelAdapter(redisConnectionFactory);
        redisInboundChannelAdapter.setTopics("topic1", "topic2");

        return IntegrationFlows
                .from(redisInboundChannelAdapter)
                .transform(Transformers.fromJson(MyMessage.class))
                .channel("redisInput")
                .get();
    }

Redisの指定したチャネル(今回はtopc1topic2)をサブスクライブするRedisInboundChannelAdapterクラスのインスタンス
作成して

        RedisInboundChannelAdapter redisInboundChannelAdapter = new RedisInboundChannelAdapter(redisConnectionFactory);
        redisInboundChannelAdapter.setTopics("topic1", "topic2");

受け取ったメッセージをStringからMyMessageクラスのインスタンスに変換するMessage Transformerを挟み、Message Channel(redisInput)に
登録します。

        return IntegrationFlows
                .from(redisInboundChannelAdapter)
                .transform(Transformers.fromJson(MyMessage.class))
                .channel("redisInput")
                .get();

ここで、redisInputはデフォルトのMessage Channelを使用しているのでDirectChannelになります。

RedisInboundChannelAdapterクラスは、spring-integration-redisに含まれているものですね。

そして、redisInputからメッセージを読み取り、Message Transformerで加工してから標準出力に書き出します。

    @Bean
    public IntegrationFlow writeStdoutFlow() {
        CharacterStreamWritingMessageHandler handler = new CharacterStreamWritingMessageHandler(new PrintWriter(System.out));
        handler.appendNewLine(true);

        return IntegrationFlows
                .from("redisInput")
                .transform("'★★★' + payload.value.toUpperCase() + '★★★'")
                .handle(handler)
                .get();
    }

Message Transformerには、SpELを使うこともできます。今回は少し装飾して、大文字にしています。

CharacterStreamWritingMessageHandlerクラスは、spring-integration-streamに含まれています。

ここまでで、1度確認してみましょう。

パッケージングして

$ mvn package

起動。

$ java -jar target/spring-integration-example-0.0.1-SNAPSHOT.jar

起動したら、Redisに接続して

$ bin/redis-cli -a redispass

いくつかメッセージをPUBLISHしてみます。

127.0.0.1:6379> PUBLISH topic1 '{ "value": "Hello World" }'
(integer) 1
127.0.0.1:6379> PUBLISH topic1 '{ "value": "Hello Spring Integration" }'
(integer) 1
127.0.0.1:6379> PUBLISH topic1 '{ "value": "Hello Spring Redis" }'
(integer) 1

アプリケーション側のコンソールを確認してみると、こんな表示が行われています。

★★★HELLO WORLD★★★
★★★HELLO SPRING INTEGRATION★★★
★★★HELLO SPRING REDIS★★★

OKですね。

ここで1度アプリケーションを停止し、以下のようなBean定義を追加します。

    @Bean
    public IntegrationFlow writeFileFlow() {
        FileWritingMessageHandler handler =
                Files
                        .outboundAdapter(new File("target/file-outputs"))
                        .appendNewLine(true)
                        .get();

        return IntegrationFlows
                .from("redisInput")
                .transform("'message = ' + payload.value")
                .handle(handler)
                .get();
    }

redisInput Message Channelからメッセージを取得しているのは、先ほどの標準出力に書き出しているフローと同じです。
メッセージの加工内容は異なるものにして、ファイルに書き出すようにしています。

FileWritingMessageHandlerクラスは、spring-integration-fileに含まれています。 なります。

このままパッケージングして起動してもいいのですが、最初にRedisからサブスクライブしたメッセージを登録する先がDirectChannelでした。
DirectChannelではひとつのConsumerしかメッセージを受け取れない(point-to-point)ので、変更しましょう。

PublishSubscribeChannelクラスになるように、Redisからサブスクライブするフローを修正します。

    @Bean
    public IntegrationFlow redisInputFlow(RedisConnectionFactory redisConnectionFactory) {
        RedisInboundChannelAdapter redisInboundChannelAdapter = new RedisInboundChannelAdapter(redisConnectionFactory);
        redisInboundChannelAdapter.setTopics("topic1", "topic2");

        return IntegrationFlows
                .from(redisInboundChannelAdapter)
                .transform(Transformers.fromJson(MyMessage.class))
                //.channel("redisInput")
                .channel(MessageChannels.publishSubscribe("redisInput"))
                .get();
    }

再度パッケージングして、起動。

$ mvn package
$ java -jar target/spring-integration-example-0.0.1-SNAPSHOT.jar

今度は、topic2に対してメッセージを送ってみます。

127.0.0.1:6379> PUBLISH topic2 '{ "value": "Redis Support" }'
(integer) 1
127.0.0.1:6379> PUBLISH topic2 '{ "value": "Stream Support" }'
(integer) 1
127.0.0.1:6379> PUBLISH topic2 '{ "value": "File Support" }'
(integer) 1

コンソール側の出力。

★★★REDIS SUPPORT★★★
★★★STREAM SUPPORT★★★
★★★FILE SUPPORT★★★

ファイルは、FileWritingMessageHandlerクラスのインスタンス作成時に指定した、ディレクトリ配下にメッセージ単位で出力されます。

こんな感じですね。

$ ll target/file-outputs
合計 20
drwxrwxr-x 2 xxxxx xxxxx 4096  4月  3 02:02 ./
drwxrwxr-x 9 xxxxx xxxxx 4096  4月  3 02:01 ../
-rw-rw-r-- 1 xxxxx xxxxx   24  4月  3 02:02 0ac4a3f2-f3f7-6766-0611-480346e0e45d.msg
-rw-rw-r-- 1 xxxxx xxxxx   23  4月  3 02:02 3699cfa8-1290-09a5-45ff-2bcde5cb79fa.msg
-rw-rw-r-- 1 xxxxx xxxxx   25  4月  3 02:02 858d2fb0-e8d0-148e-199c-4bfe12bbf30f.msg

ファイルの中身を見てみましょう。

$ cat target/file-outputs/0ac4a3f2-f3f7-6766-0611-480346e0e45d.msg
message = Redis Support
cat target/file-outputs/858d2fb0-e8d0-148e-199c-4bfe12bbf30f.msg
message = Stream Support
$ cat target/file-outputs/3699cfa8-1290-09a5-45ff-2bcde5cb79fa.msg
message = File Support

OKですね。

まとめ

Spring Integrationを初めて使ってみました。

なんとなく持ったイメージと、実際の動きは大きく外れていないのですが。あまり情報がないのと、XMLでの定義例が多かったり、そもそも
ドキュメントに登場人物(用語)が多いので、このあたりを把握するのにかなりてこずりました。

まあ、多少はつかめるようになったので、良いかなとは思いますが…。