これは、なにをしたくて書いたもの?
Spring Integrationで、JDBCを使ったデータベースポーリングをサポートしているというので、ちょっと試してみました。
Spring IntegrationのポーリングとJDBCサポート
Spring Integrationでは、ポーリングの機能があります。
ポーリングを使うようなユースケースには以下のようなものがあり、この中のひとつにデータベースが含まれています。
Messaging Channels / Pollable Message Source
また、Spring IntegrationにはJDBCのサポートが含まれています。
Inbound Channel Adapter、Outbound Channel Adapter、Outbound Gatewayなどがあります。
今回は、Inbound Channel AdapterとOutbound Channel Adapterを使ってみたいと思います。
環境
今回の環境は、こちら。
$ java --version openjdk 17.0.3 2022-04-19 OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1) OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing) $ mvn --version Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-109-generic", arch: "amd64", family: "unix"
ポーリングするデータベースには、MySQLを使います。
$ mysql --version mysql Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)
MySQLは、172.17.0.2で動作しているものとします。
お題
今回のお題は、以下とします。
- JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
- テーブルにステータスを持ち、
0
のレコードを処理対象とし、処理が終わったら1
とする
- テーブルにステータスを持ち、
テーブルの定義は、以下とします。
create table polling_message( id integer auto_increment, message_text text, process_status integer, primary key(id) );
- ポーリングで取得したデータに対しては、以下の処理を行う
- 標準出力に書き出す
- JDBCサポートのOutbound Channel Adapterを使って、別のテーブルに書き出す
別のテーブルというのは、以下の定義とします。
create table processed_message_log( log_id integer auto_increment, message_text text, primary key(log_id) );
出力処理は、交互にやっていきます。
プロジェクトを作成する
まずは、Spring InitializrでSpring Bootプロジェクトを作成します。
依存関係には、integration
、jdbc
、mysql
を加えます。
$ curl -s https://start.spring.io/starter.tgz \ -d bootVersion=2.6.7 \ -d javaVersion=17 \ -d name=integration-jdbc-polling \ -d groupId=org.littlewings \ -d artifactId=integration-jdbc-polling \ -d version=0.0.1-SNAPSHOT \ -d packageName=org.littlewings.spring.integration \ -d dependencies=integration,jdbc,mysql \ -d baseDir=integration-jdbc-polling | tar zxvf -
プロジェクト内に移動。
$ cd integration-jdbc-polling
生成されたソースコードは、とりあえず削除。
$ rm src/main/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplication.java src/test/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplicationTests.java
<properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
今回のポイントは、spring-integration-jdbc
ですね。
なお、初めてSpring Integrationを試した時はRedisのInbound Channel Adapterを使ったのですが、この時はweb
がないと(サーバーがないと)
終了してしまって困って追加したのですが、今回はそんなことはなさそうでした。
Spring Integrationを試してみる - CLOVER🍀
まず、main
クラスだけ作成。
src/main/java/org/littlewings/spring/integration/App.java
package org.littlewings.spring.integration; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class App { public static void main(String... args) { SpringApplication.run(App.class, args); } }
データベースをポーリングして、取得したデータを標準出力に書き出す
最初は、こちらから。
- JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
- テーブルにステータスを持ち、
0
のレコードを処理対象とし、処理が終わったら1
とする
- テーブルにステータスを持ち、
schema.sql
で、テーブル定義。
src/main/resources/schema.sql
drop table if exists polling_message; create table polling_message( id integer auto_increment, message_text text, process_status integer, primary key(id) );
テーブルに対応するクラスも作成。
src/main/java/org/littlewings/spring/integration/PollingMessage.java
package org.littlewings.spring.integration; public class PollingMessage { Integer id; String messageText; Integer processStatus; // getter/setterは省略 }
このクラスは今回必須ではないのですが、ポーリングして取得した結果を、このクラスにマッピングすることにします。
そして、組み立てたフロー。
src/main/java/org/littlewings/spring/integration/JdbcPollingConfig.java
package org.littlewings.spring.integration; import java.time.Duration; import java.util.List; import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.jdbc.JdbcMessageHandler; import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.messaging.MessageHandler; @Configuration public class JdbcPollingConfig { @Bean public MessageSource<Object> jdbcMessageSource(DataSource dataSource) { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ ); jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)"); jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class)); return jdbcPollingChannelAdapter; } @Bean public IntegrationFlow messagePolling() { return IntegrationFlows .from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) ) .channel("jdbcPollingChannel") .get(); } @Bean public IntegrationFlow stdout() { Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class); return IntegrationFlows .from("jdbcPollingChannel") .handle(message -> { List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload(); pollingMessages .forEach( m -> { // Thread.dumpStack(); logger.info( "process message: id = {}, message_text = {}, process_status = {}", m.getId(), m.getMessageText(), m.getProcessStatus() ); // throw new RuntimeException("oops"); } ); }) .get(); } }
Inbound Channel Adapterの部分は、ドキュメントはこのあたりを参考にしつつ。
JDBC Support / Inbound Channel Adapter
このように書いています。
@Bean public MessageSource<Object> jdbcMessageSource(DataSource dataSource) { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ ); jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)"); jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class)); return jdbcPollingChannelAdapter; } @Bean public IntegrationFlow messagePolling() { return IntegrationFlows .from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) ) .channel("jdbcPollingChannel") .get(); }
順に説明していくと。
JdbcPollingChannelAdapter
を使い、アクセスするDataSource
とポーリングの際に使用するSQLを指定。
JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ );
ポーリング後の更新SQL。こちらで、ステータスを変更します。
jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
:
付きのパラメーターは、ポーリングで取得した各行の値を指定しているもので、Spring JDBCの機能を利用しています。
The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration.
JDBC Support / Inbound Channel Adapter
ちなみに、複数渡ってくるようなので、where
句の条件がin
になっています。
そして、取得結果を先ほど作成したクラスにマッピング。
jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));
ポーリング自体の設定はこちら。
@Bean public IntegrationFlow messagePolling() { return IntegrationFlows .from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) ) .channel("jdbcPollingChannel") .get(); }
先ほどのJdbcPollingChannelAdapter
と、ポーリングの間隔を10秒にしつつ、トランザクションを有効にしています。
.from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) )
トランザクションを有効にすると、select
とupdate
が同じトランザクションで実行されます。
the update and select queries are both executed in the same transaction.
また、ダウンストリームを(デフォルトの)DirectChannel
とすると、エンドポイントが同じスレッドで動作するので、トランザクションも
同じになります。
A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.
この場合、ダウンストリームで失敗した場合は、全体がロールバックされることになりますね。
JDBC Support / Inbound Channel Adapter / Polling and Transactions
また、Inbound Channel Adapterの書き方の例は、Java DSLの方も参考になります。
Java DSL / Inbound Channel Adapters
次に、標準出力に書き出す方。
@Bean public IntegrationFlow stdout() { Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class); return IntegrationFlows .from("jdbcPollingChannel") .handle(message -> { List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload(); pollingMessages .forEach( m -> { // Thread.dumpStack(); logger.info( "process message: id = {}, message_text = {}, process_status = {}", m.getId(), m.getMessageText(), m.getProcessStatus() ); // throw new RuntimeException("oops"); } ); }) .get(); }
JdbcPollingChannelAdapter
でBeanPropertyRowMapper
を使って、クエリーの結果をPollingMessage
として扱うことができます。
List
ではありますが。
.handle(message -> { List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
あとはメッセージをログ出力です。
pollingMessages .forEach( m -> { // Thread.dumpStack(); logger.info( "process message: id = {}, message_text = {}, process_status = {}", m.getId(), m.getMessageText(), m.getProcessStatus() ); // throw new RuntimeException("oops"); } );
コメントアウトしている部分は、スタックトレースを見たり、例外を投げてトランザクションがロールバックされることを確認するのに
使ったりしていました。
設定。
src/main/resources/application.properties
spring.datasource.url=jdbc:mysql://172.17.0.2:3306/practice?characterEncoding=utf-8 spring.datasource.username=kazuhira spring.datasource.password=password spring.sql.init.mode=always logging.level.org.springframework.jdbc.core=debug logging.level.org.springframework.jdbc.support=debug
実行しているSQLや、トランザクションの様子をログ出力するようにしました。
起動時にデータも登録するようにしました。ポーリングの取得対象外にするデータも含めています。
src/main/resources/data.sql
insert into polling_message(message_text, process_status) values('Hello World!!', 0); insert into polling_message(message_text, process_status) values('[ignore] こんにちは、世界', 9); insert into polling_message(message_text, process_status) values('Hello Spring Integration', 0); insert into polling_message(message_text, process_status) values('Hello Spring Boot', 0); insert into polling_message(message_text, process_status) values('[ignore] Hello MySQL!!', 9);
では、動かしてみましょう。
パッケージングして
$ mvn package
実行。
$ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar
起動した時点での、テーブルの状態。
mysql> select * from polling_message; +----+-----------------------------------+----------------+ | id | message_text | process_status | +----+-----------------------------------+----------------+ | 1 | Hello World!! | 0 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 0 | | 4 | Hello Spring Boot | 0 | | 5 | [ignore] Hello MySQL!! | 9 | +----+-----------------------------------+----------------+ 5 rows in set (0.00 sec)
ログを見てみます。
2022-05-04 03:05:23.377 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:05:23.378 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:05:23.380 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:05:23.382 INFO 69669 --- [ main] org.littlewings.spring.integration.App : Started App in 3.857 seconds (JVM running for 4.49) 2022-05-04 03:05:23.387 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:05:23.388 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:05:23.449 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'id' to property 'id' of type 'java.lang.Integer' 2022-05-04 03:05:23.459 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String' 2022-05-04 03:05:23.459 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer' 2022-05-04 03:05:23.470 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:05:23.471 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)] 2022-05-04 03:05:23.497 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 1, message_text = Hello World!!, process_status = 0 2022-05-04 03:05:23.500 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 3, message_text = Hello Spring Integration, process_status = 0 2022-05-04 03:05:23.500 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 4, message_text = Hello Spring Boot, process_status = 0 2022-05-04 03:05:23.501 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:05:23.502 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:05:23.523 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction 2022-05-04 03:05:33.363 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:05:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:05:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:05:33.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:05:33.367 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:05:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:05:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:05:33.370 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
トランザクションの開始。
2022-05-04 03:05:23.377 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:05:23.378 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
select
文の実行。
2022-05-04 03:05:23.387 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:05:23.388 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ]
update
文の実行。
2022-05-04 03:05:23.470 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:05:23.471 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
取得したレコードの出力。
2022-05-04 03:05:23.497 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 1, message_text = Hello World!!, process_status = 0 2022-05-04 03:05:23.500 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 3, message_text = Hello Spring Integration, process_status = 0 2022-05-04 03:05:23.500 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 4, message_text = Hello Spring Boot, process_status = 0
トランザクションのコミットまでの様子がわかります。
2022-05-04 03:05:23.501 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:05:23.502 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
スレッド名を見ると、一連の処理が同じスレッドで動作しているようです。
その後は、10秒おきにselect
文が実行されますが、すでにデータが更新されているので処理対象となるデータはありません。
2022-05-04 03:05:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:05:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:05:33.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:05:33.367 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:05:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:05:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:05:33.370 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
テーブルの状態を見ると、このようになっています。
mysql> select * from polling_message; +----+-----------------------------------+----------------+ | id | message_text | process_status | +----+-----------------------------------+----------------+ | 1 | Hello World!! | 1 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 1 | | 4 | Hello Spring Boot | 1 | | 5 | [ignore] Hello MySQL!! | 9 | +----+-----------------------------------+----------------+ 5 rows in set (0.00 sec)
ポーリングで使用しているselect
文では、process_status
が0
のもののみを対象にしているので、9
にしていたレコードはそのまま
残っていますね。
続いて、データを追加してみます。処理対象外のデータも含めています。
mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0); Query OK, 1 row affected (0.03 sec) mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9); Query OK, 1 row affected (0.03 sec) mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0); Query OK, 1 row affected (0.02 sec)
すると、次回のポーリング時にデータが取得され、標準出力に書き出されます。
2022-05-04 03:08:23.363 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:08:23.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:08:23.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:08:23.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:08:23.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:08:23.368 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'id' to property 'id' of type 'java.lang.Integer' 2022-05-04 03:08:23.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String' 2022-05-04 03:08:23.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer' 2022-05-04 03:08:23.370 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:08:23.370 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)] 2022-05-04 03:08:23.372 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 6, message_text = Spring Integration JDBC Support, process_status = 0 2022-05-04 03:08:23.372 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 8, message_text = Hello MySQL Connector/J, process_status = 0 2022-05-04 03:08:23.372 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 10, message_text = JDBCでデータベースポーリング, process_status = 0 2022-05-04 03:08:23.372 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:08:23.372 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:08:23.462 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
その後は、また処理対象のデータがなくなります。
2022-05-04 03:08:33.364 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:08:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:08:33.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:08:33.367 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:08:33.367 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:08:33.368 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:08:33.368 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:08:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
データの状態。
mysql> select * from polling_message; +----+--------------------------------------------+----------------+ | id | message_text | process_status | +----+--------------------------------------------+----------------+ | 1 | Hello World!! | 1 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 1 | | 4 | Hello Spring Boot | 1 | | 5 | [ignore] Hello MySQL!! | 9 | | 6 | Spring Integration JDBC Support | 1 | | 7 | [ignore] Hello InnoDB | 9 | | 8 | Hello MySQL Connector/J | 1 | | 9 | [ignore] はじめてのSpring Integration | 9 | | 10 | JDBCでデータベースポーリング | 1 | +----+--------------------------------------------+----------------+ 10 rows in set (0.00 sec)
OKですね。
データベースをポーリングして、取得したデータをテーブルに書き出す
次は、テーブルをポーリングして、取得したデータを今度はテーブルに書き出してみましょう。
Outbound Channel Adapterを使います。
JDBC Support / Outbound Channel Adapter
書き出し先のテーブル定義は、このようにしてみます。
create table processed_message_log( log_id integer auto_increment, message_text text, primary key(log_id) );
先ほどのフロー定義を行ったクラス内のBean定義をコメントアウト。
// @Bean public IntegrationFlow stdout() {
JdbcMessageHandler
のBean定義と、その定義を使用したフローを組み立てます。
@Bean public MessageHandler jdbcMessageHandler(DataSource dataSource) { JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler( dataSource, """ insert into processed_message_log(message_text) values(:payload.messageText) """ ); return jdbcMessageHandler; } @Bean public IntegrationFlow writeTable() { return IntegrationFlows .from("jdbcPollingChannel") .handle(jdbcMessageHandler(null)) .get(); }
insert
文の:payload.messageText
という表記についてですが。
""" insert into processed_message_log(message_text) values(:payload.messageText) """
参照しているのは、Message#getPayload
になります。
Message (Spring Framework 5.3.19 API)
ペイロードの中身は、JdbcPollingChannelAdapter
でBeanPropertyRowMapper
を設定しているので、最初に前に作成したPollingMessage
クラスが
含まれているので、PollingMessage#getMessageText
を呼び出していることになります。
@Bean public MessageSource<Object> jdbcMessageSource(DataSource dataSource) { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ ); jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)"); jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class)); return jdbcPollingChannelAdapter; }
なお、この時は1件ずつの処理になるみたいですね。
定義したJdbcMessageHandler
を、フローのHandlerとして登録。
@Bean public IntegrationFlow writeTable() { return IntegrationFlows .from("jdbcPollingChannel") .handle(jdbcMessageHandler(null)) .get(); }
schema.sql
は、以下のように変更。テーブルは再作成されるので、先ほどの処理結果は1度なくなります。
src/main/resources/schema.sql
drop table if exists polling_message; create table polling_message( id integer auto_increment, message_text text, process_status integer, primary key(id) ); drop table if exists processed_message_log; create table processed_message_log( log_id integer auto_increment, message_text text, primary key(log_id) );
では、動作確認をしましょう。
パッケージングして、起動。
$ mvn package $ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar
ログは、このようになりました。
2022-05-04 03:18:33.300 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:18:33.301 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction 2022-05-04 03:18:33.303 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit 2022-05-04 03:18:33.308 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:18:33.308 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:18:33.310 INFO 70288 --- [ main] org.littlewings.spring.integration.App : Started App in 3.268 seconds (JVM running for 3.712) 2022-05-04 03:18:33.348 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'id' to property 'id' of type 'java.lang.Integer' 2022-05-04 03:18:33.350 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String' 2022-05-04 03:18:33.350 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer' 2022-05-04 03:18:33.357 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:18:33.358 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)] 2022-05-04 03:18:33.371 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:18:33.372 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:18:33.373 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates 2022-05-04 03:18:33.389 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:18:33.390 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] 2022-05-04 03:18:33.423 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction 2022-05-04 03:18:43.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:18:43.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction 2022-05-04 03:18:43.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit 2022-05-04 03:18:43.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:18:43.297 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:18:43.299 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:18:43.299 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] 2022-05-04 03:18:43.300 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
先ほどと違うのは、取得結果が標準出力に書き出されるのではなく、update
文になったことですね。
2022-05-04 03:18:33.371 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:18:33.372 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:18:33.373 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates
バッチ更新をしているようです。
コミットは、insert
文の後ですね。
2022-05-04 03:18:33.389 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:18:33.390 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
データの確認をしましょう。
最初の時点ではこの状態で、
mysql> select * from polling_message; +----+-----------------------------------+----------------+ | id | message_text | process_status | +----+-----------------------------------+----------------+ | 1 | Hello World!! | 0 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 0 | | 4 | Hello Spring Boot | 0 | | 5 | [ignore] Hello MySQL!! | 9 | +----+-----------------------------------+----------------+ 5 rows in set (0.00 sec)
処理が行われた後の結果は先ほどと同じですが、
mysql> select * from polling_message; +----+-----------------------------------+----------------+ | id | message_text | process_status | +----+-----------------------------------+----------------+ | 1 | Hello World!! | 1 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 1 | | 4 | Hello Spring Boot | 1 | | 5 | [ignore] Hello MySQL!! | 9 | +----+-----------------------------------+----------------+ 5 rows in set (0.00 sec)
処理対象となったデータが、JdbcMessageHandler
によって別のテーブルに登録されていることが確認できます。
mysql> select * from processed_message_log; +--------+--------------------------+ | log_id | message_text | +--------+--------------------------+ | 1 | Hello World!! | | 2 | Hello Spring Integration | | 3 | Hello Spring Boot | +--------+--------------------------+ 3 rows in set (0.00 sec)
先ほどと同じように、データを追加。
mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0); Query OK, 1 row affected (0.02 sec)
ログ。
2022-05-04 03:24:03.294 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:24:03.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction 2022-05-04 03:24:03.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit 2022-05-04 03:24:03.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:24:03.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:24:03.298 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'id' to property 'id' of type 'java.lang.Integer' 2022-05-04 03:24:03.298 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String' 2022-05-04 03:24:03.299 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer' 2022-05-04 03:24:03.300 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:24:03.300 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates 2022-05-04 03:24:03.306 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:24:03.306 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] 2022-05-04 03:24:03.662 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction 2022-05-04 03:24:13.294 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:24:13.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction 2022-05-04 03:24:13.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit 2022-05-04 03:24:13.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:24:13.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:24:13.297 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:24:13.297 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] 2022-05-04 03:24:13.298 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
登録したデータが検出され、処理対象になりました。
2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates
データの方も変更されています。
mysql> select * from polling_message; +----+--------------------------------------------+----------------+ | id | message_text | process_status | +----+--------------------------------------------+----------------+ | 1 | Hello World!! | 1 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 1 | | 4 | Hello Spring Boot | 1 | | 5 | [ignore] Hello MySQL!! | 9 | | 6 | Spring Integration JDBC Support | 1 | | 7 | [ignore] Hello InnoDB | 9 | | 8 | Hello MySQL Connector/J | 1 | | 9 | [ignore] はじめてのSpring Integration | 9 | | 10 | JDBCでデータベースポーリング | 1 | +----+--------------------------------------------+----------------+ 10 rows in set (0.00 sec) mysql> select * from processed_message_log; +--------+------------------------------------------+ | log_id | message_text | +--------+------------------------------------------+ | 1 | Hello World!! | | 2 | Hello Spring Integration | | 3 | Hello Spring Boot | | 4 | Spring Integration JDBC Support | | 5 | Hello MySQL Connector/J | | 6 | JDBCでデータベースポーリング | +--------+------------------------------------------+ 6 rows in set (0.00 sec)
これでOutput Channel Adapterの方も確認できました。
まとめ
Spring IntegrationのJDBCサポートを使って、データベースポーリングを試してみました。
JDBCサポートに関する機能を、Java DSLで表現するところでやや手こずりましたが、それがわかるとけっこうあっさりと動かせましたね。
だんだん、慣れてきた気がします。
最後に、フロー定義していたクラスの全体を載せておきます。Output Channel Adapterを有効にしているバージョンにしておきます。
src/main/java/org/littlewings/spring/integration/JdbcPollingConfig.java
package org.littlewings.spring.integration; import java.time.Duration; import java.util.List; import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.jdbc.JdbcMessageHandler; import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.messaging.MessageHandler; @Configuration public class JdbcPollingConfig { @Bean public MessageSource<Object> jdbcMessageSource(DataSource dataSource) { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ ); jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)"); jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class)); return jdbcPollingChannelAdapter; } @Bean public IntegrationFlow messagePolling() { return IntegrationFlows .from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) ) .channel("jdbcPollingChannel") .get(); } // @Bean public IntegrationFlow stdout() { Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class); return IntegrationFlows .from("jdbcPollingChannel") .handle(message -> { List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload(); pollingMessages .forEach( m -> { // Thread.dumpStack(); logger.info( "process message: id = {}, message_text = {}, process_status = {}", m.getId(), m.getMessageText(), m.getProcessStatus() ); // throw new RuntimeException("oops"); } ); }) .get(); } @Bean public MessageHandler jdbcMessageHandler(DataSource dataSource) { JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler( dataSource, """ insert into processed_message_log(message_text) values(:payload.messageText) """ ); return jdbcMessageHandler; } @Bean public IntegrationFlow writeTable() { return IntegrationFlows .from("jdbcPollingChannel") .handle(jdbcMessageHandler(null)) .get(); } }