これは、なにをしたくて書いたもの?
Spring Integrationを少し試しておきたいな、と思いまして。
Spring Integration
Spring Integrationは、EIP(Enterprise Integration Patterns)をSpringで実現するためのものです。
Extends the Spring programming model to support the well-known Enterprise Integration Patterns.
宣言的なアダプターを使い、外部システムとSpringベースのアプリケーションを軽量なメッセージングで統合します。
Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.
ドキュメントは、こちら。
Spring Integration Reference Guide
こちらの資料が、Spring Integrationについてわかりやすくまとめられていると思います。
システム間連携を担うSpring Integrationのエンタープライズ開発での活用
Spring Integrationにおける、主な要素はこちらのページに書かれてあります。
Overview of Spring Integration Framework
まずはメインコンポーネント。
Spring Integration Overview / Main Components
- Message … メッセージそのものを指し、ヘッダーとペイロードで構成される
- Message にインターフェースについての説明が記載
- Message Channel … 各コンポーネントをつなぐパイプの役割で、Messageの送信側(Producer)と受信側(Consumer)の関係を決定する
- 以下の2つの構成が可能
- "point-to-point" … Message Channelに送信されたメッセージを受け取るConsumerはひとつ
- "publish-subscribe" … Message Channelに送信されたメッセージを、すべてのConsumerにブロードキャストする
- さらに、Consumerに対しては以下の2つのオプションを選択できる
- pollable … バッファリング可能、能動的に受信(polling)
- subscribable … バッファリング不可、そのままMessageを受け取る
- 以下の2つの構成が可能
- Message Endpoint … アプリケーションコードをメッセージングフレームワークに接続する役割を持ち、アーキテクチャー内のフィルター役を担う
- MVCパラダイムにおける、Controllerの役割に近い
- 複数のEndpointの種類がある
- Message Transformer … Messageの変換を行う
- Message Filter … Messageを出力Channelに渡すかどうかを決定する。多くの場合、publish-subscribeと組み合わせる
- Message Router … 次にMessageを送るChannelを決定する
- Splitter … Messageを入力Channelから受け取り、Messageを複数のMessageに分割、それぞれを出力Channelに送る
- Aggregator … 複数のMessageを統合してひとつのMessageにする。Splitterの逆。
- Service Activator … サービスインスタンスをメッセージングシステムに接続するための、汎用のEndpoint
- Channel Adapter … Message Channelを他のシステムまたはトランスポートに接続するEndpointであり、入力や出力を別システムとすることができる
プログラミングの際のポイントは、こちらを参照。
Spring Integration Overview / Programming Tips and Tricks
また、これらの主要なAPIについては以下のページに記載されています。
眺めていてもよくわからないので、メモ的に書いていってみます…。
Message Channelについては、こちら。
Messaging Channels / Message Channels
MessageChannel
インターフェース- サブインターフェース
PollableChannel
インターフェース … Consumerがバッファリングできることを踏まえ、ポーリングが行えることを表すインターフェースSubscribableChannel
インターフェース … サブスクライブしているインスタンスに、メッセージを直接渡すことを表すインターフェース。ポーリングは行わない
- 実装
PublishSubscribeChannel
クラス …SubscribableChannel
インターフェースの実装であり、MessageをサブスクライブしているハンドラーにブロードキャストするQueueChannel
クラス …PollableChannel
インターフェースの実装であり、MessageをFIFOキューに格納し、ひとつのConsumerにメッセージを送信するPriorityChannel
クラス …QueueChannel
クラスとほぼ同じだが、キューがMessageの優先順位によって制御される(PriorityQueue)RendezvousChannel
クラス …QueueChannel
クラスに似ているが、ProducerがConsumerに直接Messageを渡すかのような動きになる(長さ0のキューに類似)DirectChannel
クラス …SubscribableChannel
インターフェースの実装で、単一のハンドラーにMessageを渡す。この時、Messageの送受信側の両方は、単一のスレッドで動作する。Spring IntegrationのデフォルトのChannelExecutorChannel
クラス …DirectChannel
クラスと似ているが、TaskExecutor
に処理がディスパッチされることが異なる点。メッセージの送受信側がそれぞれ別のスレッドになる可能性がある
- サブインターフェース
Pollerでは、Spring Integrationにおいてどのようにポーリングが機能しているかについて書かれています。
Channel Adapterでは、Messageの送信側、受信側に関する記述、設定について書かれています。
Messaging Channels / Channel Adapter
Spring Integrationでデータベースやファイル等からデータを読み書き(Inbound/Outbound)する際には、このChannel Adapterを使用します。
Messaging Bridgeでは、2つのMessage ChannelまたはChannel Adapterを接続する方法について書かれています。
Messaging Channels / Messaging Bridge
Channel Adapter、Message Bridgeについては設定例が書かれているので、見るとイメージが湧きやすいかなと思います。
設定はXMLかDSLで行うようですが、個人的にはできればDSLがいいかな、と思います。
Message Endpointについては、以下にさらに詳細にまとまっているのですが。
Messaging Gatewayという、リクエスト/レスポンスの形態を取れるEndpointもあるようです。
Spring IntegrationでサポートしているEndpointは、こちら。
- AMQP
- ApplicationEvents(Spring)
- Feed(RSS/ATOM)
- File
- FTP/FTPS
- Pivotal Gemfire
- HTTP
- JDBC
- JMS
- JMX
- JPA
- Apache Cassandra
- MongoDB
- MQTT
- R2DBC
- Redis
- Resource(Spring)
- RMI
- RSocket
- SFTP
- STOMP
- Stream
- Syslog
- TCP
- UDP
- WebFlux
- Web Services
- WebSocket
- XMPP
- ZeroMQ
各EndpointがInbound Channel Adapter、Outbound Channel Adapter、Inbound Gateway、Outbound Gatewayのいずれを備えるかは、
以下にQuick Referenceがあります。
その他、サンプルは以下のページやGitHubリポジトリにあります。
Getting Started | Integrating Data
だいぶ長くなりましたが、そろそろ実際に試してみましょう。
お題
今回のお題は、以下にしましょう。
- Redisをサブスクライブしてメッセージを受信
- メッセージはJSONとし、オブジェクトに変換
- オブジェクトの内容を加工して、標準出力およびファイルに出力
使用するのは、以下ですね。
環境
今回の環境は、こちら。
$ java --version openjdk 17.0.2 2022-01-18 OpenJDK Runtime Environment (build 17.0.2+8-Ubuntu-120.04) OpenJDK 64-Bit Server VM (build 17.0.2+8-Ubuntu-120.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.2, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-107-generic", arch: "amd64", family: "unix"
Redisは172.17.0.2で動作しているものとし、パスワードはredispass
とします。
バージョンはこちら。
$ bin/redis-server --version Redis server v=6.2.6 sha=00000000:0 malloc=jemalloc-5.1.0 bits=64 build=7c7087afbd1829d1
プロジェクトの作成
まずは、Spring Initializrを使ってプロジェクト作成します。
$ curl -s https://start.spring.io/starter.tgz \ -d bootVersion=2.6.6 \ -d javaVersion=17 \ -d name=spring-integration-example \ -d groupId=org.littlewings \ -d artifactId=spring-integration-example \ -d version=0.0.1-SNAPSHOT \ -d packageName=org.littlewings \ -d dependencies=web,integration,data-redis \ -d baseDir=spring-integration-example | tar zxvf -
依存関係にはweb
、integration
、data-redis
を含めていますが、これだけではまだ足りません。
プロジェクト内へ移動。
$ cd spring-integration-example
この時点での依存関係等。
<properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
依存関係は、以下のように変更します。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> </dependency> --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> </dependencies>
spring-integration-http
は使わないので削除。spring-integration-redis
、spring-integration-stream
、spring-integration-file
を追加。
なお、Spring Data Redisはspring-integration-redis
が使用するので必要です。spring-integration-redis
自体の依存関係にも含まれて
いるのですが、まるっとspring-boot-starter-data-redis
を使うことにしました。
最初から含まれているソースコードは削除。
$ rm src/main/java/org/littlewings/SpringIntegrationExampleApplication.java src/test/java/org/littlewings/SpringIntegrationExampleApplicationTests.java
Redisへの接続情報は、application.properties
に記載しておきます。
src/main/resources/application.properties
spring.redis.host=172.17.0.2 spring.redis.port=6379 spring.redis.password=redispass
プログラムを作成する
では、プログラムを作成していきます。
まずは、特に変わったところのないmainクラス。
src/main/java/org/littlewings/spring/integration/App.java
package org.littlewings.spring.integration; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class App { public static void main(String... args) { SpringApplication.run(App.class, args); } }
メッセージはJSONで受け取ることとして、Javaのオブジェクトに変換するようにしましょう。
こんなクラスにしました。
src/main/java/org/littlewings/spring/integration/MyMessage.java
package org.littlewings.spring.integration; public class MyMessage { String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
あとは、フローを組み立てていきます。
最終的には、こうなりました。
src/main/java/org/littlewings/spring/integration/MessagingConfig.java
package org.littlewings.spring.integration; import java.io.File; import java.io.PrintWriter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.dsl.Transformers; import org.springframework.integration.file.FileWritingMessageHandler; import org.springframework.integration.file.dsl.Files; import org.springframework.integration.redis.inbound.RedisInboundChannelAdapter; import org.springframework.integration.stream.CharacterStreamWritingMessageHandler; @Configuration public class MessagingConfig { @Bean public IntegrationFlow redisInputFlow(RedisConnectionFactory redisConnectionFactory) { RedisInboundChannelAdapter redisInboundChannelAdapter = new RedisInboundChannelAdapter(redisConnectionFactory); redisInboundChannelAdapter.setTopics("topic1", "topic2"); return IntegrationFlows .from(redisInboundChannelAdapter) .transform(Transformers.fromJson(MyMessage.class)) //.channel("redisInput") .channel(MessageChannels.publishSubscribe("redisInput")) .get(); } @Bean public IntegrationFlow writeStdoutFlow() { CharacterStreamWritingMessageHandler handler = new CharacterStreamWritingMessageHandler(new PrintWriter(System.out)); handler.appendNewLine(true); return IntegrationFlows .from("redisInput") .transform("'★★★' + payload.value.toUpperCase() + '★★★'") .handle(handler) .get(); } @Bean public IntegrationFlow writeFileFlow() { FileWritingMessageHandler handler = Files .outboundAdapter(new File("target/file-outputs")) .appendNewLine(true) .get(); return IntegrationFlows .from("redisInput") .transform("'message = ' + payload.value") .handle(handler) .get(); } }
なのですが、説明を兼ねて、最初はここからスタートしましょう。
@Configuration public class MessagingConfig { // ここに@Bean定義を書く }
最初に定義するのは、こちら。
@Bean public IntegrationFlow redisInputFlow(RedisConnectionFactory redisConnectionFactory) { RedisInboundChannelAdapter redisInboundChannelAdapter = new RedisInboundChannelAdapter(redisConnectionFactory); redisInboundChannelAdapter.setTopics("topic1", "topic2"); return IntegrationFlows .from(redisInboundChannelAdapter) .transform(Transformers.fromJson(MyMessage.class)) .channel("redisInput") .get(); }
Redisの指定したチャネル(今回はtopc1
、topic2
)をサブスクライブするRedisInboundChannelAdapter
クラスのインスタンスを
作成して
RedisInboundChannelAdapter redisInboundChannelAdapter = new RedisInboundChannelAdapter(redisConnectionFactory); redisInboundChannelAdapter.setTopics("topic1", "topic2");
受け取ったメッセージをString
からMyMessage
クラスのインスタンスに変換するMessage Transformerを挟み、Message Channel(redisInput
)に
登録します。
return IntegrationFlows .from(redisInboundChannelAdapter) .transform(Transformers.fromJson(MyMessage.class)) .channel("redisInput") .get();
ここで、redisInput
はデフォルトのMessage Channelを使用しているのでDirectChannel
になります。
RedisInboundChannelAdapter
クラスは、spring-integration-redis
に含まれているものですね。
そして、redisInput
からメッセージを読み取り、Message Transformerで加工してから標準出力に書き出します。
@Bean public IntegrationFlow writeStdoutFlow() { CharacterStreamWritingMessageHandler handler = new CharacterStreamWritingMessageHandler(new PrintWriter(System.out)); handler.appendNewLine(true); return IntegrationFlows .from("redisInput") .transform("'★★★' + payload.value.toUpperCase() + '★★★'") .handle(handler) .get(); }
Message Transformerには、SpELを使うこともできます。今回は少し装飾して、大文字にしています。
CharacterStreamWritingMessageHandler
クラスは、spring-integration-stream
に含まれています。
ここまでで、1度確認してみましょう。
パッケージングして
$ mvn package
起動。
$ java -jar target/spring-integration-example-0.0.1-SNAPSHOT.jar
起動したら、Redisに接続して
$ bin/redis-cli -a redispass
いくつかメッセージをPUBLISH
してみます。
127.0.0.1:6379> PUBLISH topic1 '{ "value": "Hello World" }' (integer) 1 127.0.0.1:6379> PUBLISH topic1 '{ "value": "Hello Spring Integration" }' (integer) 1 127.0.0.1:6379> PUBLISH topic1 '{ "value": "Hello Spring Redis" }' (integer) 1
アプリケーション側のコンソールを確認してみると、こんな表示が行われています。
★★★HELLO WORLD★★★ ★★★HELLO SPRING INTEGRATION★★★ ★★★HELLO SPRING REDIS★★★
OKですね。
ここで1度アプリケーションを停止し、以下のようなBean定義を追加します。
@Bean public IntegrationFlow writeFileFlow() { FileWritingMessageHandler handler = Files .outboundAdapter(new File("target/file-outputs")) .appendNewLine(true) .get(); return IntegrationFlows .from("redisInput") .transform("'message = ' + payload.value") .handle(handler) .get(); }
redisInput
Message Channelからメッセージを取得しているのは、先ほどの標準出力に書き出しているフローと同じです。
メッセージの加工内容は異なるものにして、ファイルに書き出すようにしています。
FileWritingMessageHandler
クラスは、spring-integration-file
に含まれています。
なります。
このままパッケージングして起動してもいいのですが、最初にRedisからサブスクライブしたメッセージを登録する先がDirectChannel
でした。
DirectChannel
ではひとつのConsumerしかメッセージを受け取れない(point-to-point)ので、変更しましょう。
PublishSubscribeChannel
クラスになるように、Redisからサブスクライブするフローを修正します。
@Bean public IntegrationFlow redisInputFlow(RedisConnectionFactory redisConnectionFactory) { RedisInboundChannelAdapter redisInboundChannelAdapter = new RedisInboundChannelAdapter(redisConnectionFactory); redisInboundChannelAdapter.setTopics("topic1", "topic2"); return IntegrationFlows .from(redisInboundChannelAdapter) .transform(Transformers.fromJson(MyMessage.class)) //.channel("redisInput") .channel(MessageChannels.publishSubscribe("redisInput")) .get(); }
再度パッケージングして、起動。
$ mvn package $ java -jar target/spring-integration-example-0.0.1-SNAPSHOT.jar
今度は、topic2
に対してメッセージを送ってみます。
127.0.0.1:6379> PUBLISH topic2 '{ "value": "Redis Support" }' (integer) 1 127.0.0.1:6379> PUBLISH topic2 '{ "value": "Stream Support" }' (integer) 1 127.0.0.1:6379> PUBLISH topic2 '{ "value": "File Support" }' (integer) 1
コンソール側の出力。
★★★REDIS SUPPORT★★★ ★★★STREAM SUPPORT★★★ ★★★FILE SUPPORT★★★
ファイルは、FileWritingMessageHandler
クラスのインスタンス作成時に指定した、ディレクトリ配下にメッセージ単位で出力されます。
こんな感じですね。
$ ll target/file-outputs 合計 20 drwxrwxr-x 2 xxxxx xxxxx 4096 4月 3 02:02 ./ drwxrwxr-x 9 xxxxx xxxxx 4096 4月 3 02:01 ../ -rw-rw-r-- 1 xxxxx xxxxx 24 4月 3 02:02 0ac4a3f2-f3f7-6766-0611-480346e0e45d.msg -rw-rw-r-- 1 xxxxx xxxxx 23 4月 3 02:02 3699cfa8-1290-09a5-45ff-2bcde5cb79fa.msg -rw-rw-r-- 1 xxxxx xxxxx 25 4月 3 02:02 858d2fb0-e8d0-148e-199c-4bfe12bbf30f.msg
ファイルの中身を見てみましょう。
$ cat target/file-outputs/0ac4a3f2-f3f7-6766-0611-480346e0e45d.msg message = Redis Support
cat target/file-outputs/858d2fb0-e8d0-148e-199c-4bfe12bbf30f.msg message = Stream Support
$ cat target/file-outputs/3699cfa8-1290-09a5-45ff-2bcde5cb79fa.msg message = File Support
OKですね。
まとめ
Spring Integrationを初めて使ってみました。
なんとなく持ったイメージと、実際の動きは大きく外れていないのですが。あまり情報がないのと、XMLでの定義例が多かったり、そもそも
ドキュメントに登場人物(用語)が多いので、このあたりを把握するのにかなりてこずりました。
まあ、多少はつかめるようになったので、良いかなとは思いますが…。