CLOVER🍀

That was when it all began.

Spring IntegrationのJDBCサポートを使って、データベースポーリングを試してみる

これは、なにをしたくて書いたもの?

Spring Integrationで、JDBCを使ったデータベースポーリングをサポートしているというので、ちょっと試してみました。

Spring IntegrationのポーリングとJDBCサポート

Spring Integrationでは、ポーリングの機能があります。

Messaging Channels / Poller

ポーリングを使うようなユースケースには以下のようなものがあり、この中のひとつにデータベースが含まれています。

  • Polling certain external systems, such as FTP Servers, Databases, and Web Services
  • Polling internal (pollable) message channels
  • Polling internal services (such as repeatedly executing methods on a Java class)

Messaging Channels / Pollable Message Source

また、Spring IntegrationにはJDBCのサポートが含まれています。

JDBC Support

Inbound Channel Adapter、Outbound Channel Adapter、Outbound Gatewayなどがあります。

今回は、Inbound Channel AdapterとOutbound Channel Adapterを使ってみたいと思います。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-109-generic", arch: "amd64", family: "unix"

ポーリングするデータベースには、MySQLを使います。

$ mysql --version
mysql  Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)

MySQLは、172.17.0.2で動作しているものとします。

お題

今回のお題は、以下とします。

  • JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
    • テーブルにステータスを持ち、0のレコードを処理対象とし、処理が終わったら1とする

テーブルの定義は、以下とします。

create table polling_message(
  id integer auto_increment,
  message_text text,
  process_status integer,
  primary key(id)
);
  • ポーリングで取得したデータに対しては、以下の処理を行う
    • 標準出力に書き出す
    • JDBCサポートのOutbound Channel Adapterを使って、別のテーブルに書き出す

別のテーブルというのは、以下の定義とします。

create table processed_message_log(
  log_id integer auto_increment,
  message_text text,
  primary key(log_id)
);

出力処理は、交互にやっていきます。

プロジェクトを作成する

まずは、Spring InitializrでSpring Bootプロジェクトを作成します。

依存関係には、integrationjdbcmysqlを加えます。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=integration-jdbc-polling \
  -d groupId=org.littlewings \
  -d artifactId=integration-jdbc-polling \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.integration \
  -d dependencies=integration,jdbc,mysql \
  -d baseDir=integration-jdbc-polling | tar zxvf -

プロジェクト内に移動。

$ cd integration-jdbc-polling

生成されたソースコードは、とりあえず削除。

$ rm src/main/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplication.java src/test/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplicationTests.java

Mavenの依存関係や、プラグインの設定はこちら。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-integration</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-jdbc</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-jdbc</artifactId>
                </dependency>

                <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

今回のポイントは、spring-integration-jdbcですね。

なお、初めてSpring Integrationを試した時はRedisのInbound Channel Adapterを使ったのですが、この時はwebがないと(サーバーがないと)
終了してしまって困って追加したのですが、今回はそんなことはなさそうでした。

Spring Integrationを試してみる - CLOVER🍀

まず、mainクラスだけ作成。

src/main/java/org/littlewings/spring/integration/App.java

package org.littlewings.spring.integration;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

データベースをポーリングして、取得したデータを標準出力に書き出す

最初は、こちらから。

  • JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
    • テーブルにステータスを持ち、0のレコードを処理対象とし、処理が終わったら1とする

schema.sqlで、テーブル定義。

src/main/resources/schema.sql

drop table if exists polling_message;

create table polling_message(
  id integer auto_increment,
  message_text text,
  process_status integer,
  primary key(id)
);

テーブルに対応するクラスも作成。

src/main/java/org/littlewings/spring/integration/PollingMessage.java

package org.littlewings.spring.integration;

public class PollingMessage {

    Integer id;
    String messageText;
    Integer processStatus;

    // getter/setterは省略
}

このクラスは今回必須ではないのですが、ポーリングして取得した結果を、このクラスにマッピングすることにします。

そして、組み立てたフロー。

src/main/java/org/littlewings/spring/integration/JdbcPollingConfig.java

package org.littlewings.spring.integration;

import java.time.Duration;
import java.util.List;
import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.messaging.MessageHandler;

@Configuration
public class JdbcPollingConfig {
    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

    @Bean
    public IntegrationFlow stdout() {
        Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class);

        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );
                })
                .get();
    }
}

Inbound Channel Adapterの部分は、ドキュメントはこのあたりを参考にしつつ。

JDBC Support / Inbound Channel Adapter

このように書いています。

    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

順に説明していくと。

JdbcPollingChannelAdapterを使い、アクセスするDataSourceとポーリングの際に使用するSQLを指定。

        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

ポーリング後の更新SQL。こちらで、ステータスを変更します。

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");

:付きのパラメーターは、ポーリングで取得した各行の値を指定しているもので、Spring JDBCの機能を利用しています。

The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration.

JDBC Support / Inbound Channel Adapter

ちなみに、複数渡ってくるようなので、where句の条件がinになっています。

そして、取得結果を先ほど作成したクラスにマッピング

        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

ポーリング自体の設定はこちら。

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

先ほどのJdbcPollingChannelAdapterと、ポーリングの間隔を10秒にしつつ、トランザクションを有効にしています。

                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )

トランザクションを有効にすると、selectupdateが同じトランザクションで実行されます。

the update and select queries are both executed in the same transaction.

また、ダウンストリームを(デフォルトの)DirectChannelとすると、エンドポイントが同じスレッドで動作するので、トランザクション
同じになります。

A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.

この場合、ダウンストリームで失敗した場合は、全体がロールバックされることになりますね。

JDBC Support / Inbound Channel Adapter / Polling and Transactions

また、Inbound Channel Adapterの書き方の例は、Java DSLの方も参考になります。

Java DSL / Inbound Channel Adapters

次に、標準出力に書き出す方。

    @Bean
    public IntegrationFlow stdout() {
        Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class);

        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );
                })
                .get();
    }

JdbcPollingChannelAdapterBeanPropertyRowMapperを使って、クエリーの結果をPollingMessageとして扱うことができます。
Listではありますが。

                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();

あとはメッセージをログ出力です。

                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );

コメントアウトしている部分は、スタックトレースを見たり、例外を投げてトランザクションロールバックされることを確認するのに
使ったりしていました。

設定。

src/main/resources/application.properties

spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8
spring.datasource.username=kazuhira
spring.datasource.password=password

spring.sql.init.mode=always

logging.level.org.springframework.jdbc.core=debug
logging.level.org.springframework.jdbc.support=debug

実行しているSQLや、トランザクションの様子をログ出力するようにしました。

起動時にデータも登録するようにしました。ポーリングの取得対象外にするデータも含めています。

src/main/resources/data.sql

insert into polling_message(message_text, process_status) values('Hello World!!', 0);
insert into polling_message(message_text, process_status) values('[ignore] こんにちは、世界', 9);
insert into polling_message(message_text, process_status) values('Hello Spring Integration', 0);
insert into polling_message(message_text, process_status) values('Hello Spring Boot', 0);
insert into polling_message(message_text, process_status) values('[ignore] Hello MySQL!!', 9);

では、動かしてみましょう。

パッケージングして

$ mvn package

実行。

$ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar

起動した時点での、テーブルの状態。

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              0 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              0 |
|  4 | Hello Spring Boot                 |              0 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

ログを見てみます。

2022-05-04 03:05:23.377 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:05:23.378 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:23.380 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:05:23.382  INFO 69669 --- [           main] org.littlewings.spring.integration.App   : Started App in 3.857 seconds (JVM running for 4.49)
2022-05-04 03:05:23.387 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:23.388 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:05:23.449 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:05:23.459 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:05:23.459 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:05:23.470 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:05:23.471 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:05:23.497  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 1, message_text = Hello World!!, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 3, message_text = Hello Spring Integration, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 4, message_text = Hello Spring Boot, process_status = 0
2022-05-04 03:05:23.501 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:23.502 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:23.523 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
2022-05-04 03:05:33.363 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:05:33.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:33.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

トランザクションの開始。

2022-05-04 03:05:23.377 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:05:23.378 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction

select文の実行。

2022-05-04 03:05:23.387 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:23.388 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]

update文の実行。

2022-05-04 03:05:23.470 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:05:23.471 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]

取得したレコードの出力。

2022-05-04 03:05:23.497  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 1, message_text = Hello World!!, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 3, message_text = Hello Spring Integration, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 4, message_text = Hello Spring Boot, process_status = 0

トランザクションのコミットまでの様子がわかります。

2022-05-04 03:05:23.501 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:23.502 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]

スレッド名を見ると、一連の処理が同じスレッドで動作しているようです。

その後は、10秒おきにselect文が実行されますが、すでにデータが更新されているので処理対象となるデータはありません。

2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:05:33.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:33.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

テーブルの状態を見ると、このようになっています。

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              1 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              1 |
|  4 | Hello Spring Boot                 |              1 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

ポーリングで使用しているselect文では、process_status0のもののみを対象にしているので、9にしていたレコードはそのまま
残っていますね。

続いて、データを追加してみます。処理対象外のデータも含めています。

mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0);
Query OK, 1 row affected (0.03 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9);
Query OK, 1 row affected (0.03 sec)

mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0);
Query OK, 1 row affected (0.02 sec)

すると、次回のポーリング時にデータが取得され、標準出力に書き出されます。

2022-05-04 03:08:23.363 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:08:23.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:08:23.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:08:23.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:08:23.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:08:23.368 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:08:23.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:08:23.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:08:23.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:08:23.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:08:23.372  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 6, message_text = Spring Integration JDBC Support, process_status = 0
2022-05-04 03:08:23.372  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 8, message_text = Hello MySQL Connector/J, process_status = 0
2022-05-04 03:08:23.372  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 10, message_text = JDBCでデータベースポーリング, process_status = 0
2022-05-04 03:08:23.372 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:08:23.372 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:08:23.462 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

その後は、また処理対象のデータがなくなります。

2022-05-04 03:08:33.364 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:08:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:08:33.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:08:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:08:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:08:33.368 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:08:33.368 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:08:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

データの状態。

mysql> select * from polling_message;
+----+--------------------------------------------+----------------+
| id | message_text                               | process_status |
+----+--------------------------------------------+----------------+
|  1 | Hello World!!                              |              1 |
|  2 | [ignore] こんにちは、世界                  |              9 |
|  3 | Hello Spring Integration                   |              1 |
|  4 | Hello Spring Boot                          |              1 |
|  5 | [ignore] Hello MySQL!!                     |              9 |
|  6 | Spring Integration JDBC Support            |              1 |
|  7 | [ignore] Hello InnoDB                      |              9 |
|  8 | Hello MySQL Connector/J                    |              1 |
|  9 | [ignore] はじめてのSpring Integration      |              9 |
| 10 | JDBCでデータベースポーリング               |              1 |
+----+--------------------------------------------+----------------+
10 rows in set (0.00 sec)

OKですね。

データベースをポーリングして、取得したデータをテーブルに書き出す

次は、テーブルをポーリングして、取得したデータを今度はテーブルに書き出してみましょう。

Outbound Channel Adapterを使います。

JDBC Support / Outbound Channel Adapter

書き出し先のテーブル定義は、このようにしてみます。

create table processed_message_log(
  log_id integer auto_increment,
  message_text text,
  primary key(log_id)
);

先ほどのフロー定義を行ったクラス内のBean定義をコメントアウト

    // @Bean
    public IntegrationFlow stdout() {

JdbcMessageHandlerのBean定義と、その定義を使用したフローを組み立てます。

    @Bean
    public MessageHandler jdbcMessageHandler(DataSource dataSource) {
        JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(
                dataSource,
                """
                        insert into processed_message_log(message_text) 
                        values(:payload.messageText)
                        """
        );
        return jdbcMessageHandler;
    }

    @Bean
    public IntegrationFlow writeTable() {
        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(jdbcMessageHandler(null))
                .get();
    }

insert文の:payload.messageTextという表記についてですが。

                """
                        insert into processed_message_log(message_text) 
                        values(:payload.messageText)
                        """

参照しているのは、Message#getPayloadになります。

Message (Spring Framework 5.3.19 API)

ペイロードの中身は、JdbcPollingChannelAdapterBeanPropertyRowMapperを設定しているので、最初に前に作成したPollingMessageクラスが
含まれているので、PollingMessage#getMessageTextを呼び出していることになります。

    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

なお、この時は1件ずつの処理になるみたいですね。

定義したJdbcMessageHandlerを、フローのHandlerとして登録。

    @Bean
    public IntegrationFlow writeTable() {
        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(jdbcMessageHandler(null))
                .get();
    }

schema.sqlは、以下のように変更。テーブルは再作成されるので、先ほどの処理結果は1度なくなります。

src/main/resources/schema.sql

drop table if exists polling_message;

create table polling_message(
  id integer auto_increment,
  message_text text,
  process_status integer,
  primary key(id)
);

drop table if exists processed_message_log;

create table processed_message_log(
  log_id integer auto_increment,
  message_text text,
  primary key(log_id)
);

では、動作確認をしましょう。

パッケージングして、起動。

$ mvn package
$ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar

ログは、このようになりました。

2022-05-04 03:18:33.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:18:33.301 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:18:33.303 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:18:33.308 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:18:33.308 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:18:33.310  INFO 70288 --- [           main] org.littlewings.spring.integration.App   : Started App in 3.268 seconds (JVM running for 3.712)
2022-05-04 03:18:33.348 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:18:33.350 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:18:33.350 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:18:33.357 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:18:33.358 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:18:33.371 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.372 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.373 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates
2022-05-04 03:18:33.389 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:18:33.390 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:18:33.423 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
2022-05-04 03:18:43.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:18:43.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:18:43.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:18:43.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:18:43.297 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:18:43.299 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:18:43.299 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:18:43.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction

先ほどと違うのは、取得結果が標準出力に書き出されるのではなく、update文になったことですね。

2022-05-04 03:18:33.371 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.372 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.373 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates

バッチ更新をしているようです。

コミットは、insert文の後ですね。

2022-05-04 03:18:33.389 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:18:33.390 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]

データの確認をしましょう。

最初の時点ではこの状態で、

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              0 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              0 |
|  4 | Hello Spring Boot                 |              0 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

処理が行われた後の結果は先ほどと同じですが、

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              1 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              1 |
|  4 | Hello Spring Boot                 |              1 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

処理対象となったデータが、JdbcMessageHandlerによって別のテーブルに登録されていることが確認できます。

mysql> select * from processed_message_log;
+--------+--------------------------+
| log_id | message_text             |
+--------+--------------------------+
|      1 | Hello World!!            |
|      2 | Hello Spring Integration |
|      3 | Hello Spring Boot        |
+--------+--------------------------+
3 rows in set (0.00 sec)

先ほどと同じように、データを追加。

mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0);
Query OK, 1 row affected (0.02 sec)

ログ。

2022-05-04 03:24:03.294 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:24:03.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:24:03.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:24:03.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:24:03.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:24:03.298 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:24:03.298 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:24:03.299 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:24:03.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:24:03.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates
2022-05-04 03:24:03.306 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:24:03.306 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:24:03.662 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
2022-05-04 03:24:13.294 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:24:13.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:24:13.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:24:13.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:24:13.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:24:13.297 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:24:13.297 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:24:13.298 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction

登録したデータが検出され、処理対象になりました。

2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates

データの方も変更されています。

mysql> select * from polling_message;
+----+--------------------------------------------+----------------+
| id | message_text                               | process_status |
+----+--------------------------------------------+----------------+
|  1 | Hello World!!                              |              1 |
|  2 | [ignore] こんにちは、世界                  |              9 |
|  3 | Hello Spring Integration                   |              1 |
|  4 | Hello Spring Boot                          |              1 |
|  5 | [ignore] Hello MySQL!!                     |              9 |
|  6 | Spring Integration JDBC Support            |              1 |
|  7 | [ignore] Hello InnoDB                      |              9 |
|  8 | Hello MySQL Connector/J                    |              1 |
|  9 | [ignore] はじめてのSpring Integration      |              9 |
| 10 | JDBCでデータベースポーリング               |              1 |
+----+--------------------------------------------+----------------+
10 rows in set (0.00 sec)

mysql> select * from processed_message_log;
+--------+------------------------------------------+
| log_id | message_text                             |
+--------+------------------------------------------+
|      1 | Hello World!!                            |
|      2 | Hello Spring Integration                 |
|      3 | Hello Spring Boot                        |
|      4 | Spring Integration JDBC Support          |
|      5 | Hello MySQL Connector/J                  |
|      6 | JDBCでデータベースポーリング             |
+--------+------------------------------------------+
6 rows in set (0.00 sec)

これでOutput Channel Adapterの方も確認できました。

まとめ

Spring IntegrationのJDBCサポートを使って、データベースポーリングを試してみました。

JDBCサポートに関する機能を、Java DSLで表現するところでやや手こずりましたが、それがわかるとけっこうあっさりと動かせましたね。

だんだん、慣れてきた気がします。

最後に、フロー定義していたクラスの全体を載せておきます。Output Channel Adapterを有効にしているバージョンにしておきます。

src/main/java/org/littlewings/spring/integration/JdbcPollingConfig.java

package org.littlewings.spring.integration;

import java.time.Duration;
import java.util.List;
import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.messaging.MessageHandler;

@Configuration
public class JdbcPollingConfig {
    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

    // @Bean
    public IntegrationFlow stdout() {
        Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class);

        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );
                })
                .get();
    }

    @Bean
    public MessageHandler jdbcMessageHandler(DataSource dataSource) {
        JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(
                dataSource,
                """
                        insert into processed_message_log(message_text)
                        values(:payload.messageText)
                        """
        );
        return jdbcMessageHandler;
    }

    @Bean
    public IntegrationFlow writeTable() {
        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(jdbcMessageHandler(null))
                .get();
    }
}

Spring IntegrationのGatewayを試す

これは、なにをしたくて書いたもの?

少し前に、Spring Integrationを試してみました。

Spring Integrationを試してみる - CLOVER🍀

他にドキュメントを読んでいて、Gatewayを使ってみないと全体のイメージが掴めない気がするので、1回試してみようかなと。

Gateway

Gatewayは、Overview内にも少し出てきます。request - replyな形式をサポートするようです。

request-reply MessagingGatewaySupport implementations (such as AmqpInboundGateway and AbstractWebServiceInboundGateway).

Spring Integration Overview / Finding Class Names for Java and DSL Configuration

もっと詳細に書かれている、Messaging Gatewayに関するドキュメントはこちら。

Messaging Endpoints / Messaging Gateways

Gatewayは、Spring Integrationによって提供するメッセージングAPIを隠するものだとされています。

A gateway hides the messaging API provided by Spring Integration. It lets your application’s business logic be unaware of the Spring Integration API.

こちらを見ると、インターフェースに対するプロキシを生成し、Spring Integrationを内部で呼び出す仕組みになるようです。

Spring Integration provides the GatewayProxyFactoryBean, which generates a proxy for any interface and internally invokes the gateway methods shown below. By using dependency injection, you can then expose the interface to your business methods.

Messaging Endpoints / Messaging Gateways / Enter the GatewayProxyFactoryBean

Messaging Endpoints / Messaging Gateways / Gateway XML Namespace Support

こうすると、Spring Integrationが使われていることを呼び出し元が意識しなくなる、ということですね。

With this configuration defined, the cafeService can now be injected into other beans, and the code that invokes the methods on that proxied instance of the Cafe interface has no awareness of the Spring Integration API. The general approach is similar to that of Spring Remoting (RMI, HttpInvoker, and so on).

よく似たアプローチを取っているものとして、RMIなどが挙げられています。

このあたりを見ると、裏では(Reply用の)Channelを使ったSpring Integrationの仕組みが動作するようですね。

Typically, you need not specify the default-reply-channel, since a Gateway auto-creates a temporary, anonymous reply channel, where it listens for the reply. However, some cases may prompt you to define a default-reply-channel (or reply-channel with adapter gateways, such as HTTP, JMS, and others).

Messaging Endpoints / Messaging Gateways / Setting the Default Reply Channel

内部的な動作が説明されているので、ちょっと見てみましょう。

  • Gatewayは、一時的なpoint-to-pointなリプライ用のChannelを作成する
    • 匿名であり、メッセージヘッダーが追加される
  • 明示的なdefault-reply-channelを指定する場合(Remote Adapter Gatewayと一緒にreply-channelを指定する場合)は、publish-subscribe Channelを指定できる
    • この場合、複数のSubscriberを指定可能
    • Spring Integrationは、一時的なreply-channelと明示的なdefault-reply-channelとの間にブリッジを作成する

For some background, we briefly discuss some of the inner workings of the gateway. A gateway creates a temporary point-to-point reply channel. It is anonymous and is added to the message headers with the name, replyChannel. When providing an explicit default-reply-channel (reply-channel with remote adapter gateways), you can point to a publish-subscribe channel, which is so named because you can add more than one subscriber to it. Internally, Spring Integration creates a bridge between the temporary replyChannel and the explicitly defined default-reply-channel.

また、リプライをGatewayだけでなく他のConsumerに送信したい場合は、サブスクライブを行うために明示的な名前付けされたChannelを
作る必要があります。そのChannelは、publish-subscribe-channelとして機能します

Suppose you want your reply to go not only to the gateway but also to some other consumer. In this case, you want two things: - A named channel to which you can subscribe - That channel to be a publish-subscribe-channel

Messaging Endpoints / Messaging Gateways / Setting the Default Reply Channel

と、書いていても掴めないところもあるので、あとは実際にソースコードを書いて動かしてみましょう。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-109-generic", arch: "amd64", family: "unix"

プロジェクトを作成する

まずは、Spring Bootプロジェクトを作成します。依存関係に、webintegrationを加えました。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=integration-gateway-example \
  -d groupId=org.littlewings \
  -d artifactId=integration-gateway-example \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.integration \
  -d dependencies=web,integration \
  -d baseDir=integration-gateway-example | tar zxvf -

プロジェクト内に移動。

$ cd integration-gateway-example

デフォルトで生成されるソースコードは削除しておきます。

$ rm src/main/java/org/littlewings/spring/integration/IntegrationGatewayExampleApplication.java src/test/java/org/littlewings/spring/integration/IntegrationGatewayExampleApplicationTests.java

Mavenの依存関係やプラグインの設定を見ておきます。

 <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

Spring Web MVCとSpring Integrationを指定すると、spring-integration-httpもついてくるんですね。

こちらのことですね。

HTTP Support

せっかくなので、こちらも使って簡単なサンプルアプリケーションを作成したいと思います。

とりあえず、mainクラスだけは用意しておきます。

src/main/java/org/littlewings/spring/integration/App.java

package org.littlewings.spring.integration;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

お題

お題は、Echoプログラムにしましょう。

こんな感じのクラスをリクエストとして受け取り、

src/main/java/org/littlewings/spring/integration/RequestMessage.java

package org.littlewings.spring.integration;

public class RequestMessage {
    String request;

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }
}

レスポンスをこちらで返すプログラムを作ることにします。

src/main/java/org/littlewings/spring/integration/ResponseMessage.java

package org.littlewings.spring.integration;

public class ResponseMessage {
    String response;

    public String getResponse() {
        return response;
    }

    public void setResponse(String response) {
        this.response = response;
    }
}

リクエストは、JSONで受け取るようにしましょう。

最初はGatewayをシンプルに使い、次にSpring IntegrationのHTTPサポートを使ったパターンを作っていきます。

Gatewayをシンプルに使う

まずは、Gatewayをシンプルに使ってみます。

こちらを見つつ。

Messaging Endpoints / Messaging Gateways / Gateway Configuration with Annotations and XML

Messaging Endpoints / Messaging Gateways / @MessagingGateway Annotation

@MessagingGatewayアノテーションと、@Gatewayアノテーションを付与したインターフェースを作ればよさそうです。

src/main/java/org/littlewings/spring/integration/simple/EchoService.java

package org.littlewings.spring.integration.simple;

import org.littlewings.spring.integration.RequestMessage;
import org.littlewings.spring.integration.ResponseMessage;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway
public interface EchoService {
    @Gateway(requestChannel = "echoRequestChannel")
    ResponseMessage execute(RequestMessage request);
}

request-channelはechoRequestChannelとしておきます。

GatewayProxyFactoryBeanを使って定義する方法もあるようですが。

Messaging Endpoints / Messaging Gateways / / Enter the GatewayProxyFactoryBean

今回は、定義したEchoServiceインターフェースをDIして使うことにするので、アノテーションを使って定義してコンポーネントスキャンして
もらうことにしましょう。

次に、IntegrationFlowを定義します。

src/main/java/org/littlewings/spring/integration/simple/EchoGatewayConfig.java

package org.littlewings.spring.integration.simple;

import org.littlewings.spring.integration.RequestMessage;
import org.littlewings.spring.integration.ResponseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;

@Configuration
public class EchoGatewayConfig {
    @Bean
    public IntegrationFlow reply() {
        return IntegrationFlows
                .from("echoRequestChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(EchoGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }
}

先ほどのインターフェースで定義した、echoRequestChannelをサブスクライブして処理するGenericHandlerを定義しましょう。

Java DSL / Service Activators and the .handle() method

ヘッダーをログ出力して

                    Logger logger = LoggerFactory.getLogger(EchoGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

リクエストの内容を加工して返すようにします。

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;

では、このインターフェースを使うRestControllerを定義します。

src/main/java/org/littlewings/spring/integration/simple/EchoController.java

package org.littlewings.spring.integration.simple;

import org.littlewings.spring.integration.RequestMessage;
import org.littlewings.spring.integration.ResponseMessage;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EchoController {
    EchoService echoService;

    public EchoController(EchoService echoService) {
        this.echoService = echoService;
    }

    @PostMapping("echo")
    public ResponseMessage echo(@RequestBody RequestMessage request) {
        return echoService.execute(request);
    }
}

では、動かしてみましょう。

$ mvn spring-boot:run

確認。

$ curl -H 'Content-Type: application/json' localhost:8080/echo -d '{"request": "Hello World"}'
{"response":"★★★ Hello World ★★★"}

RestControllerで受け取った内容が

    @PostMapping("echo")
    public ResponseMessage echo(@RequestBody RequestMessage request) {
        return echoService.execute(request);
    }

こちらで定義したGenericHandlerでハンドリングされているのがわかりますね。

    @Bean
    public IntegrationFlow reply() {
        return IntegrationFlows
                .from("echoRequestChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(EchoGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }

ヘッダーは、こんな感じでログ出力されました。

2022-05-01 20:51:39.608  INFO 21766 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = replyChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@b9c6c95
2022-05-01 20:51:39.609  INFO 21766 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = errorChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@b9c6c95
2022-05-01 20:51:39.609  INFO 21766 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = id, value = 4834b4ac-ac57-0d4a-5f15-2631733b86d5
2022-05-01 20:51:39.609  INFO 21766 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = timestamp, value = 1651405899607

というわけで、RestControllerから見た時はふつうのインターフェースなのに、裏ではSpring Integrationが動作しているという仕掛けが
Gatewayだということが確認できました。

@RestController
public class EchoController {
    EchoService echoService;

    public EchoController(EchoService echoService) {
        this.echoService = echoService;
    }

    @PostMapping("echo")
    public ResponseMessage echo(@RequestBody RequestMessage request) {
        return echoService.execute(request);
    }
}

ここで、1度アプリケーションを停止しておきます。

HTTPサポートを使ってみる

次に、Spring IntegrationのHTTPサポートを使ってみましょう。

HTTP Support

今回は、自分でGatewayは作成しません。HTTPサポートが提供しているHttpRequestHandlingMessagingGatewayを使います。

HTTP Support / Http Inbound Components

RestControllerも作成せず、HttpRequestHandlingMessagingGatewayがリクエストを受け付け、レスポンスを返します。

なお、今回は使いませんがいわゆるHTTPクライアントに相当するHttpRequestExecutingMessageHandlerもあるようです。

HTTP Support / HTTP Outbound Components

では、Configurationを作成します。

src/main/java/org/littlewings/spring/integration/http/HttpGatewayConfig.java

package org.littlewings.spring.integration.http;

import org.littlewings.spring.integration.simple.EchoService;
import org.littlewings.spring.integration.RequestMessage;
import org.littlewings.spring.integration.ResponseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.dsl.Http;

@Configuration
public class HttpGatewayConfig {
    @Bean
    public IntegrationFlow httpInbound() {
        return IntegrationFlows
                .from(
                        Http
                                .inboundGateway("http-gateway")
                                .requestMapping(mapping -> mapping.methods(HttpMethod.POST))
                                .requestPayloadType(RequestMessage.class)
                                .get()
                )
                .channel("httpInboundChannel")
                .get();
    }

    @Bean
    public IntegrationFlow handler() {
        return IntegrationFlows
                .from("httpInboundChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(HttpGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }
}

リクエストを受け付けてレスポンスを返すGatewayは、この部分ですね。

    @Bean
    public IntegrationFlow httpInbound() {
        return IntegrationFlows
                .from(
                        Http
                                .inboundGateway("http-gateway")
                                .requestMapping(mapping -> mapping.methods(HttpMethod.POST))
                                .requestPayloadType(RequestMessage.class)
                                .get()
                )
                .channel("httpInboundChannel")
                .get();
    }

http-gatewayというパスにマッピングすることにしました。

HttpRequestHandlingMessagingGatewayは、Java DSLHttpというクラスを使って組み立てています。

Http (Spring Integration 5.5.11 API)

HTTP Support / Configuring HTTP Endpoints with Java

リクエストの内容を扱っている部分は、先ほどのシンプルにGatewayを作成した場合と同じように(GenericHandler)作成しました。

    @Bean
    public IntegrationFlow handler() {
        return IntegrationFlows
                .from("httpInboundChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(HttpGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }

再度起動。

$ mvn spring-boot:run

確認。

$ curl -H 'Content-Type: application/json' localhost:8080/http-gateway -d '{"request": "Hello World"}'
{"response":"★★★ Hello World ★★★"}

先ほどと同じ結果になりましたね。

ログ出力されたヘッダーの情報。

2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = content-length, value = 26
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = http_requestMethod, value = POST
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = replyChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@95d3903
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = errorChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@95d3903
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = host, value = localhost:8080
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = http_requestUrl, value = http://localhost:8080/http-gateway
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = id, value = 0ee3cfdb-b4e3-7500-d571-8f39e973fee1
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = contentType, value = application/json;charset=UTF-8
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = user-agent, value = curl/7.68.0
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = accept, value = */*
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = timestamp, value = 1651407766391

最後に少し捻りで、こちらのGenericHandlerを使った定義をコメントアウトして

    //@Bean
    public IntegrationFlow handler() {
        return IntegrationFlows
                .from("httpInboundChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(HttpGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }

先ほど作成したインターフェースとGatewayを使ってみます。

    @Bean
    public IntegrationFlow delegate(EchoService echoService) {
        return IntegrationFlows
                .from("httpInboundChannel")
                .handle(echoService, "execute")
                .get();
    }

アプリケーションを起動。

$ mvn spring-boot:run

先ほどと同じ結果になりました。

$ curl -H 'Content-Type: application/json' localhost:8080/http-gateway -d '{"request": "Hello World"}'
{"response":"★★★ Hello World ★★★"}

ログは、最初に作成したConfigurationでのログになっていますね。

2022-05-01 21:34:47.562  INFO 24136 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = replyChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@56354fec
2022-05-01 21:34:47.563  INFO 24136 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = errorChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@56354fec
2022-05-01 21:34:47.564  INFO 24136 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = id, value = e710a00d-3ea4-1f55-38e9-8421c2fcec99
2022-05-01 21:34:47.564  INFO 24136 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = timestamp, value = 1651408487562

とりあえず、こんなところでしょうか。

まとめ

Spring IntegrationのGatewayを試してみました。

リクエスト、レスポンスを実現できるものだということは前回ドキュメントを見て雰囲気はわかっていましたが、いざ使おうとするとなかなか
入り方がわからなくて苦労しました…。

とりあえず、自分でGatewayを作ってみるのと、すでに実装済みのGatewayを使ってみるパターンはできたのでよしとしましょう。