CLOVER🍀

That was when it all began.

Spring IntegrationのJDBCサポートを使って、データベースポーリングを試してみる

これは、なにをしたくて書いたもの?

Spring Integrationで、JDBCを使ったデータベースポーリングをサポートしているというので、ちょっと試してみました。

Spring IntegrationのポーリングとJDBCサポート

Spring Integrationでは、ポーリングの機能があります。

Messaging Channels / Poller

ポーリングを使うようなユースケースには以下のようなものがあり、この中のひとつにデータベースが含まれています。

  • Polling certain external systems, such as FTP Servers, Databases, and Web Services
  • Polling internal (pollable) message channels
  • Polling internal services (such as repeatedly executing methods on a Java class)

Messaging Channels / Pollable Message Source

また、Spring IntegrationにはJDBCのサポートが含まれています。

JDBC Support

Inbound Channel Adapter、Outbound Channel Adapter、Outbound Gatewayなどがあります。

今回は、Inbound Channel AdapterとOutbound Channel Adapterを使ってみたいと思います。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-109-generic", arch: "amd64", family: "unix"

ポーリングするデータベースには、MySQLを使います。

$ mysql --version
mysql  Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)

MySQLは、172.17.0.2で動作しているものとします。

お題

今回のお題は、以下とします。

  • JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
    • テーブルにステータスを持ち、0のレコードを処理対象とし、処理が終わったら1とする

テーブルの定義は、以下とします。

create table polling_message(
  id integer auto_increment,
  message_text text,
  process_status integer,
  primary key(id)
);
  • ポーリングで取得したデータに対しては、以下の処理を行う
    • 標準出力に書き出す
    • JDBCサポートのOutbound Channel Adapterを使って、別のテーブルに書き出す

別のテーブルというのは、以下の定義とします。

create table processed_message_log(
  log_id integer auto_increment,
  message_text text,
  primary key(log_id)
);

出力処理は、交互にやっていきます。

プロジェクトを作成する

まずは、Spring InitializrでSpring Bootプロジェクトを作成します。

依存関係には、integrationjdbcmysqlを加えます。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=integration-jdbc-polling \
  -d groupId=org.littlewings \
  -d artifactId=integration-jdbc-polling \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.integration \
  -d dependencies=integration,jdbc,mysql \
  -d baseDir=integration-jdbc-polling | tar zxvf -

プロジェクト内に移動。

$ cd integration-jdbc-polling

生成されたソースコードは、とりあえず削除。

$ rm src/main/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplication.java src/test/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplicationTests.java

Mavenの依存関係や、プラグインの設定はこちら。

        <properties>
                <java.version>17</java.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-integration</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-jdbc</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-jdbc</artifactId>
                </dependency>

                <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.integration</groupId>
                        <artifactId>spring-integration-test</artifactId>
                        <scope>test</scope>
                </dependency>
        </dependencies>

        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>

今回のポイントは、spring-integration-jdbcですね。

なお、初めてSpring Integrationを試した時はRedisのInbound Channel Adapterを使ったのですが、この時はwebがないと(サーバーがないと)
終了してしまって困って追加したのですが、今回はそんなことはなさそうでした。

Spring Integrationを試してみる - CLOVER🍀

まず、mainクラスだけ作成。

src/main/java/org/littlewings/spring/integration/App.java

package org.littlewings.spring.integration;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

データベースをポーリングして、取得したデータを標準出力に書き出す

最初は、こちらから。

  • JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
    • テーブルにステータスを持ち、0のレコードを処理対象とし、処理が終わったら1とする

schema.sqlで、テーブル定義。

src/main/resources/schema.sql

drop table if exists polling_message;

create table polling_message(
  id integer auto_increment,
  message_text text,
  process_status integer,
  primary key(id)
);

テーブルに対応するクラスも作成。

src/main/java/org/littlewings/spring/integration/PollingMessage.java

package org.littlewings.spring.integration;

public class PollingMessage {

    Integer id;
    String messageText;
    Integer processStatus;

    // getter/setterは省略
}

このクラスは今回必須ではないのですが、ポーリングして取得した結果を、このクラスにマッピングすることにします。

そして、組み立てたフロー。

src/main/java/org/littlewings/spring/integration/JdbcPollingConfig.java

package org.littlewings.spring.integration;

import java.time.Duration;
import java.util.List;
import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.messaging.MessageHandler;

@Configuration
public class JdbcPollingConfig {
    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

    @Bean
    public IntegrationFlow stdout() {
        Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class);

        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );
                })
                .get();
    }
}

Inbound Channel Adapterの部分は、ドキュメントはこのあたりを参考にしつつ。

JDBC Support / Inbound Channel Adapter

このように書いています。

    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

順に説明していくと。

JdbcPollingChannelAdapterを使い、アクセスするDataSourceとポーリングの際に使用するSQLを指定。

        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

ポーリング後の更新SQL。こちらで、ステータスを変更します。

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");

:付きのパラメーターは、ポーリングで取得した各行の値を指定しているもので、Spring JDBCの機能を利用しています。

The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration.

JDBC Support / Inbound Channel Adapter

ちなみに、複数渡ってくるようなので、where句の条件がinになっています。

そして、取得結果を先ほど作成したクラスにマッピング

        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

ポーリング自体の設定はこちら。

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

先ほどのJdbcPollingChannelAdapterと、ポーリングの間隔を10秒にしつつ、トランザクションを有効にしています。

                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )

トランザクションを有効にすると、selectupdateが同じトランザクションで実行されます。

the update and select queries are both executed in the same transaction.

また、ダウンストリームを(デフォルトの)DirectChannelとすると、エンドポイントが同じスレッドで動作するので、トランザクション
同じになります。

A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.

この場合、ダウンストリームで失敗した場合は、全体がロールバックされることになりますね。

JDBC Support / Inbound Channel Adapter / Polling and Transactions

また、Inbound Channel Adapterの書き方の例は、Java DSLの方も参考になります。

Java DSL / Inbound Channel Adapters

次に、標準出力に書き出す方。

    @Bean
    public IntegrationFlow stdout() {
        Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class);

        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );
                })
                .get();
    }

JdbcPollingChannelAdapterBeanPropertyRowMapperを使って、クエリーの結果をPollingMessageとして扱うことができます。
Listではありますが。

                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();

あとはメッセージをログ出力です。

                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );

コメントアウトしている部分は、スタックトレースを見たり、例外を投げてトランザクションロールバックされることを確認するのに
使ったりしていました。

設定。

src/main/resources/application.properties

spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8
spring.datasource.username=kazuhira
spring.datasource.password=password

spring.sql.init.mode=always

logging.level.org.springframework.jdbc.core=debug
logging.level.org.springframework.jdbc.support=debug

実行しているSQLや、トランザクションの様子をログ出力するようにしました。

起動時にデータも登録するようにしました。ポーリングの取得対象外にするデータも含めています。

src/main/resources/data.sql

insert into polling_message(message_text, process_status) values('Hello World!!', 0);
insert into polling_message(message_text, process_status) values('[ignore] こんにちは、世界', 9);
insert into polling_message(message_text, process_status) values('Hello Spring Integration', 0);
insert into polling_message(message_text, process_status) values('Hello Spring Boot', 0);
insert into polling_message(message_text, process_status) values('[ignore] Hello MySQL!!', 9);

では、動かしてみましょう。

パッケージングして

$ mvn package

実行。

$ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar

起動した時点での、テーブルの状態。

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              0 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              0 |
|  4 | Hello Spring Boot                 |              0 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

ログを見てみます。

2022-05-04 03:05:23.377 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:05:23.378 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:23.380 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:05:23.382  INFO 69669 --- [           main] org.littlewings.spring.integration.App   : Started App in 3.857 seconds (JVM running for 4.49)
2022-05-04 03:05:23.387 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:23.388 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:05:23.449 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:05:23.459 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:05:23.459 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:05:23.470 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:05:23.471 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:05:23.497  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 1, message_text = Hello World!!, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 3, message_text = Hello Spring Integration, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 4, message_text = Hello Spring Boot, process_status = 0
2022-05-04 03:05:23.501 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:23.502 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:23.523 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
2022-05-04 03:05:33.363 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:05:33.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:33.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

トランザクションの開始。

2022-05-04 03:05:23.377 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:05:23.378 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction

select文の実行。

2022-05-04 03:05:23.387 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:23.388 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]

update文の実行。

2022-05-04 03:05:23.470 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:05:23.471 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]

取得したレコードの出力。

2022-05-04 03:05:23.497  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 1, message_text = Hello World!!, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 3, message_text = Hello Spring Integration, process_status = 0
2022-05-04 03:05:23.500  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 4, message_text = Hello Spring Boot, process_status = 0

トランザクションのコミットまでの様子がわかります。

2022-05-04 03:05:23.501 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:23.502 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]

スレッド名を見ると、一連の処理が同じスレッドで動作しているようです。

その後は、10秒おきにselect文が実行されますが、すでにデータが更新されているので処理対象となるデータはありません。

2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:05:33.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:05:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:05:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:33.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

テーブルの状態を見ると、このようになっています。

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              1 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              1 |
|  4 | Hello Spring Boot                 |              1 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

ポーリングで使用しているselect文では、process_status0のもののみを対象にしているので、9にしていたレコードはそのまま
残っていますね。

続いて、データを追加してみます。処理対象外のデータも含めています。

mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0);
Query OK, 1 row affected (0.03 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9);
Query OK, 1 row affected (0.03 sec)

mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0);
Query OK, 1 row affected (0.02 sec)

すると、次回のポーリング時にデータが取得され、標準出力に書き出されます。

2022-05-04 03:08:23.363 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:08:23.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:08:23.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:08:23.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:08:23.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:08:23.368 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:08:23.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:08:23.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:08:23.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:08:23.370 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:08:23.372  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 6, message_text = Spring Integration JDBC Support, process_status = 0
2022-05-04 03:08:23.372  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 8, message_text = Hello MySQL Connector/J, process_status = 0
2022-05-04 03:08:23.372  INFO 69669 --- [   scheduling-1] o.l.s.integration.JdbcPollingConfig      : process message: id = 10, message_text = JDBCでデータベースポーリング, process_status = 0
2022-05-04 03:08:23.372 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:08:23.372 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:08:23.462 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

その後は、また処理対象のデータがなくなります。

2022-05-04 03:08:33.364 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:08:33.365 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:08:33.366 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit
2022-05-04 03:08:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:08:33.367 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:08:33.368 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:08:33.368 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:08:33.369 DEBUG 69669 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction

データの状態。

mysql> select * from polling_message;
+----+--------------------------------------------+----------------+
| id | message_text                               | process_status |
+----+--------------------------------------------+----------------+
|  1 | Hello World!!                              |              1 |
|  2 | [ignore] こんにちは、世界                  |              9 |
|  3 | Hello Spring Integration                   |              1 |
|  4 | Hello Spring Boot                          |              1 |
|  5 | [ignore] Hello MySQL!!                     |              9 |
|  6 | Spring Integration JDBC Support            |              1 |
|  7 | [ignore] Hello InnoDB                      |              9 |
|  8 | Hello MySQL Connector/J                    |              1 |
|  9 | [ignore] はじめてのSpring Integration      |              9 |
| 10 | JDBCでデータベースポーリング               |              1 |
+----+--------------------------------------------+----------------+
10 rows in set (0.00 sec)

OKですね。

データベースをポーリングして、取得したデータをテーブルに書き出す

次は、テーブルをポーリングして、取得したデータを今度はテーブルに書き出してみましょう。

Outbound Channel Adapterを使います。

JDBC Support / Outbound Channel Adapter

書き出し先のテーブル定義は、このようにしてみます。

create table processed_message_log(
  log_id integer auto_increment,
  message_text text,
  primary key(log_id)
);

先ほどのフロー定義を行ったクラス内のBean定義をコメントアウト

    // @Bean
    public IntegrationFlow stdout() {

JdbcMessageHandlerのBean定義と、その定義を使用したフローを組み立てます。

    @Bean
    public MessageHandler jdbcMessageHandler(DataSource dataSource) {
        JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(
                dataSource,
                """
                        insert into processed_message_log(message_text) 
                        values(:payload.messageText)
                        """
        );
        return jdbcMessageHandler;
    }

    @Bean
    public IntegrationFlow writeTable() {
        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(jdbcMessageHandler(null))
                .get();
    }

insert文の:payload.messageTextという表記についてですが。

                """
                        insert into processed_message_log(message_text) 
                        values(:payload.messageText)
                        """

参照しているのは、Message#getPayloadになります。

Message (Spring Framework 5.3.19 API)

ペイロードの中身は、JdbcPollingChannelAdapterBeanPropertyRowMapperを設定しているので、最初に前に作成したPollingMessageクラスが
含まれているので、PollingMessage#getMessageTextを呼び出していることになります。

    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

なお、この時は1件ずつの処理になるみたいですね。

定義したJdbcMessageHandlerを、フローのHandlerとして登録。

    @Bean
    public IntegrationFlow writeTable() {
        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(jdbcMessageHandler(null))
                .get();
    }

schema.sqlは、以下のように変更。テーブルは再作成されるので、先ほどの処理結果は1度なくなります。

src/main/resources/schema.sql

drop table if exists polling_message;

create table polling_message(
  id integer auto_increment,
  message_text text,
  process_status integer,
  primary key(id)
);

drop table if exists processed_message_log;

create table processed_message_log(
  log_id integer auto_increment,
  message_text text,
  primary key(log_id)
);

では、動作確認をしましょう。

パッケージングして、起動。

$ mvn package
$ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar

ログは、このようになりました。

2022-05-04 03:18:33.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:18:33.301 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:18:33.303 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:18:33.308 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:18:33.308 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:18:33.310  INFO 70288 --- [           main] org.littlewings.spring.integration.App   : Started App in 3.268 seconds (JVM running for 3.712)
2022-05-04 03:18:33.348 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:18:33.350 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:18:33.350 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:18:33.357 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:18:33.358 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:18:33.371 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.372 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.373 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates
2022-05-04 03:18:33.389 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:18:33.390 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:18:33.423 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
2022-05-04 03:18:43.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:18:43.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:18:43.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:18:43.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:18:43.297 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:18:43.299 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:18:43.299 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:18:43.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction

先ほどと違うのは、取得結果が標準出力に書き出されるのではなく、update文になったことですね。

2022-05-04 03:18:33.371 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.372 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:18:33.373 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates

バッチ更新をしているようです。

コミットは、insert文の後ですね。

2022-05-04 03:18:33.389 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:18:33.390 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]

データの確認をしましょう。

最初の時点ではこの状態で、

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              0 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              0 |
|  4 | Hello Spring Boot                 |              0 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

処理が行われた後の結果は先ほどと同じですが、

mysql> select * from polling_message;
+----+-----------------------------------+----------------+
| id | message_text                      | process_status |
+----+-----------------------------------+----------------+
|  1 | Hello World!!                     |              1 |
|  2 | [ignore] こんにちは、世界         |              9 |
|  3 | Hello Spring Integration          |              1 |
|  4 | Hello Spring Boot                 |              1 |
|  5 | [ignore] Hello MySQL!!            |              9 |
+----+-----------------------------------+----------------+
5 rows in set (0.00 sec)

処理対象となったデータが、JdbcMessageHandlerによって別のテーブルに登録されていることが確認できます。

mysql> select * from processed_message_log;
+--------+--------------------------+
| log_id | message_text             |
+--------+--------------------------+
|      1 | Hello World!!            |
|      2 | Hello Spring Integration |
|      3 | Hello Spring Boot        |
+--------+--------------------------+
3 rows in set (0.00 sec)

先ほどと同じように、データを追加。

mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9);
Query OK, 1 row affected (0.02 sec)

mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0);
Query OK, 1 row affected (0.02 sec)

ログ。

2022-05-04 03:24:03.294 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:24:03.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:24:03.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:24:03.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:24:03.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:24:03.298 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'id' to property 'id' of type 'java.lang.Integer'
2022-05-04 03:24:03.298 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String'
2022-05-04 03:24:03.299 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper      : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer'
2022-05-04 03:24:03.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL update
2022-05-04 03:24:03.300 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates
2022-05-04 03:24:03.306 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:24:03.306 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:24:03.662 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
2022-05-04 03:24:13.294 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-05-04 03:24:13.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Acquired Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction
2022-05-04 03:24:13.295 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Switching JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit
2022-05-04 03:24:13.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL query
2022-05-04 03:24:13.296 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [select
  id, message_text, process_status
from
  polling_message
where
  process_status = 0
]
2022-05-04 03:24:13.297 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Initiating transaction commit
2022-05-04 03:24:13.297 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Committing JDBC transaction on Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
2022-05-04 03:24:13.298 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcTransactionManager  : Releasing JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction

登録したデータが検出され、処理対象になりました。

2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing SQL batch update [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.core.JdbcTemplate               : Executing prepared SQL statement [insert into processed_message_log(message_text)
values(?)
]
2022-05-04 03:24:03.302 DEBUG 70288 --- [   scheduling-1] o.s.jdbc.support.JdbcUtils               : JDBC driver supports batch updates

データの方も変更されています。

mysql> select * from polling_message;
+----+--------------------------------------------+----------------+
| id | message_text                               | process_status |
+----+--------------------------------------------+----------------+
|  1 | Hello World!!                              |              1 |
|  2 | [ignore] こんにちは、世界                  |              9 |
|  3 | Hello Spring Integration                   |              1 |
|  4 | Hello Spring Boot                          |              1 |
|  5 | [ignore] Hello MySQL!!                     |              9 |
|  6 | Spring Integration JDBC Support            |              1 |
|  7 | [ignore] Hello InnoDB                      |              9 |
|  8 | Hello MySQL Connector/J                    |              1 |
|  9 | [ignore] はじめてのSpring Integration      |              9 |
| 10 | JDBCでデータベースポーリング               |              1 |
+----+--------------------------------------------+----------------+
10 rows in set (0.00 sec)

mysql> select * from processed_message_log;
+--------+------------------------------------------+
| log_id | message_text                             |
+--------+------------------------------------------+
|      1 | Hello World!!                            |
|      2 | Hello Spring Integration                 |
|      3 | Hello Spring Boot                        |
|      4 | Spring Integration JDBC Support          |
|      5 | Hello MySQL Connector/J                  |
|      6 | JDBCでデータベースポーリング             |
+--------+------------------------------------------+
6 rows in set (0.00 sec)

これでOutput Channel Adapterの方も確認できました。

まとめ

Spring IntegrationのJDBCサポートを使って、データベースポーリングを試してみました。

JDBCサポートに関する機能を、Java DSLで表現するところでやや手こずりましたが、それがわかるとけっこうあっさりと動かせましたね。

だんだん、慣れてきた気がします。

最後に、フロー定義していたクラスの全体を載せておきます。Output Channel Adapterを有効にしているバージョンにしておきます。

src/main/java/org/littlewings/spring/integration/JdbcPollingConfig.java

package org.littlewings.spring.integration;

import java.time.Duration;
import java.util.List;
import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.messaging.MessageHandler;

@Configuration
public class JdbcPollingConfig {
    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
                new JdbcPollingChannelAdapter(
                        dataSource,
                        """
                                select
                                  id, message_text, process_status
                                from
                                  polling_message
                                where
                                  process_status = 0
                                """
                );

        jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
        jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));

        return jdbcPollingChannelAdapter;
    }

    @Bean
    public IntegrationFlow messagePolling() {
        return IntegrationFlows
                .from(
                        jdbcMessageSource(null),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional())
                )
                .channel("jdbcPollingChannel")
                .get();
    }

    // @Bean
    public IntegrationFlow stdout() {
        Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class);

        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(message -> {
                    List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
                    pollingMessages
                            .forEach(
                                    m -> {
                                        // Thread.dumpStack();
                                        logger.info(
                                                "process message: id = {}, message_text = {}, process_status = {}",
                                                m.getId(),
                                                m.getMessageText(),
                                                m.getProcessStatus()
                                        );
                                        // throw new RuntimeException("oops");
                                    }
                            );
                })
                .get();
    }

    @Bean
    public MessageHandler jdbcMessageHandler(DataSource dataSource) {
        JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(
                dataSource,
                """
                        insert into processed_message_log(message_text)
                        values(:payload.messageText)
                        """
        );
        return jdbcMessageHandler;
    }

    @Bean
    public IntegrationFlow writeTable() {
        return IntegrationFlows
                .from("jdbcPollingChannel")
                .handle(jdbcMessageHandler(null))
                .get();
    }
}