Spring Integrationで、JDBCを使ったデータベースポーリングをサポートしているというので、ちょっと試してみました。
Spring IntegrationのポーリングとJDBCサポート
Spring Integrationでは、ポーリングの機能があります。
Messaging Channels / Pollable Message Source
また、Spring IntegrationにはJDBCのサポートが含まれています。
Inbound Channel Adapter、Outbound Channel Adapter、Outbound Gatewayなどがあります。
今回は、Inbound Channel AdapterとOutbound Channel Adapterを使ってみたいと思います。
$ java --version openjdk 17.0.3 2022-04-19 OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1) OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing) $ mvn --version Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-109-generic", arch: "amd64", family: "unix"
$ mysql --version mysql Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)
- JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
- テーブルにステータスを持ち、
- テーブルにステータスを持ち、
create table polling_message( id integer auto_increment, message_text text, process_status integer, primary key(id) );
- ポーリングで取得したデータに対しては、以下の処理を行う
- 標準出力に書き出す
- JDBCサポートのOutbound Channel Adapterを使って、別のテーブルに書き出す
create table processed_message_log( log_id integer auto_increment, message_text text, primary key(log_id) );
まずは、Spring InitializrでSpring Bootプロジェクトを作成します。
$ curl -s https://start.spring.io/starter.tgz \ -d bootVersion=2.6.7 \ -d javaVersion=17 \ -d name=integration-jdbc-polling \ -d groupId=org.littlewings \ -d artifactId=integration-jdbc-polling \ -d version=0.0.1-SNAPSHOT \ -d packageName=org.littlewings.spring.integration \ -d dependencies=integration,jdbc,mysql \ -d baseDir=integration-jdbc-polling | tar zxvf -
$ cd integration-jdbc-polling
$ rm src/main/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplication.java src/test/java/org/littlewings/spring/integration/IntegrationJdbcPollingApplicationTests.java
<properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
なお、初めてSpring Integrationを試した時はRedisのInbound Channel Adapterを使ったのですが、この時はweb
Spring Integrationを試してみる - CLOVER🍀
package org.littlewings.spring.integration; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class App { public static void main(String... args) { SpringApplication.run(App.class, args); } }
- JDBCサポートのInbound Channel Adapterを使い、テーブルに登録されたデータを10秒間隔でポーリング
- テーブルにステータスを持ち、
- テーブルにステータスを持ち、
drop table if exists polling_message; create table polling_message( id integer auto_increment, message_text text, process_status integer, primary key(id) );
package org.littlewings.spring.integration; public class PollingMessage { Integer id; String messageText; Integer processStatus; // getter/setterは省略 }
package org.littlewings.spring.integration; import java.time.Duration; import java.util.List; import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.jdbc.JdbcMessageHandler; import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.messaging.MessageHandler; @Configuration public class JdbcPollingConfig { @Bean public MessageSource<Object> jdbcMessageSource(DataSource dataSource) { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ ); jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)"); jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class)); return jdbcPollingChannelAdapter; } @Bean public IntegrationFlow messagePolling() { return IntegrationFlows .from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) ) .channel("jdbcPollingChannel") .get(); } @Bean public IntegrationFlow stdout() { Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class); return IntegrationFlows .from("jdbcPollingChannel") .handle(message -> { List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload(); pollingMessages .forEach( m -> { // Thread.dumpStack(); logger.info( "process message: id = {}, message_text = {}, process_status = {}", m.getId(), m.getMessageText(), m.getProcessStatus() ); // throw new RuntimeException("oops"); } ); }) .get(); } }
Inbound Channel Adapterの部分は、ドキュメントはこのあたりを参考にしつつ。
JDBC Support / Inbound Channel Adapter
@Bean public MessageSource<Object> jdbcMessageSource(DataSource dataSource) { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ ); jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)"); jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class)); return jdbcPollingChannelAdapter; } @Bean public IntegrationFlow messagePolling() { return IntegrationFlows .from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) ) .channel("jdbcPollingChannel") .get(); }
JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ );
jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)");
付きのパラメーターは、ポーリングで取得した各行の値を指定しているもので、Spring JDBCの機能を利用しています。
The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration.
JDBC Support / Inbound Channel Adapter
jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class));
@Bean public IntegrationFlow messagePolling() { return IntegrationFlows .from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) ) .channel("jdbcPollingChannel") .get(); }
.from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) )
the update and select queries are both executed in the same transaction.
A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.
JDBC Support / Inbound Channel Adapter / Polling and Transactions
また、Inbound Channel Adapterの書き方の例は、Java DSLの方も参考になります。
Java DSL / Inbound Channel Adapters
@Bean public IntegrationFlow stdout() { Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class); return IntegrationFlows .from("jdbcPollingChannel") .handle(message -> { List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload(); pollingMessages .forEach( m -> { // Thread.dumpStack(); logger.info( "process message: id = {}, message_text = {}, process_status = {}", m.getId(), m.getMessageText(), m.getProcessStatus() ); // throw new RuntimeException("oops"); } ); }) .get(); }
.handle(message -> { List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload();
pollingMessages .forEach( m -> { // Thread.dumpStack(); logger.info( "process message: id = {}, message_text = {}, process_status = {}", m.getId(), m.getMessageText(), m.getProcessStatus() ); // throw new RuntimeException("oops"); } );
spring.datasource.url=jdbc:mysql:// spring.datasource.username=kazuhira spring.datasource.password=password spring.sql.init.mode=always logging.level.org.springframework.jdbc.core=debug logging.level.org.springframework.jdbc.support=debug
insert into polling_message(message_text, process_status) values('Hello World!!', 0); insert into polling_message(message_text, process_status) values('[ignore] こんにちは、世界', 9); insert into polling_message(message_text, process_status) values('Hello Spring Integration', 0); insert into polling_message(message_text, process_status) values('Hello Spring Boot', 0); insert into polling_message(message_text, process_status) values('[ignore] Hello MySQL!!', 9);
$ mvn package
$ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar
mysql> select * from polling_message; +----+-----------------------------------+----------------+ | id | message_text | process_status | +----+-----------------------------------+----------------+ | 1 | Hello World!! | 0 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 0 | | 4 | Hello Spring Boot | 0 | | 5 | [ignore] Hello MySQL!! | 9 | +----+-----------------------------------+----------------+ 5 rows in set (0.00 sec)
2022-05-04 03:05:23.377 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:05:23.378 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:05:23.380 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:05:23.382 INFO 69669 --- [ main] org.littlewings.spring.integration.App : Started App in 3.857 seconds (JVM running for 4.49) 2022-05-04 03:05:23.387 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:05:23.388 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:05:23.449 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'id' to property 'id' of type 'java.lang.Integer' 2022-05-04 03:05:23.459 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String' 2022-05-04 03:05:23.459 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer' 2022-05-04 03:05:23.470 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:05:23.471 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)] 2022-05-04 03:05:23.497 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 1, message_text = Hello World!!, process_status = 0 2022-05-04 03:05:23.500 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 3, message_text = Hello Spring Integration, process_status = 0 2022-05-04 03:05:23.500 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 4, message_text = Hello Spring Boot, process_status = 0 2022-05-04 03:05:23.501 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:05:23.502 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:05:23.523 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction 2022-05-04 03:05:33.363 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:05:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:05:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:05:33.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:05:33.367 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:05:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:05:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:05:33.370 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
2022-05-04 03:05:23.377 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:05:23.378 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction
2022-05-04 03:05:23.387 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:05:23.388 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ]
2022-05-04 03:05:23.470 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:05:23.471 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)]
2022-05-04 03:05:23.497 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 1, message_text = Hello World!!, process_status = 0 2022-05-04 03:05:23.500 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 3, message_text = Hello Spring Integration, process_status = 0 2022-05-04 03:05:23.500 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 4, message_text = Hello Spring Boot, process_status = 0
2022-05-04 03:05:23.501 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:05:23.502 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@14998526 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb]
2022-05-04 03:05:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:05:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:05:33.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:05:33.367 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:05:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:05:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:05:33.370 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@274205753 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
mysql> select * from polling_message; +----+-----------------------------------+----------------+ | id | message_text | process_status | +----+-----------------------------------+----------------+ | 1 | Hello World!! | 1 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 1 | | 4 | Hello Spring Boot | 1 | | 5 | [ignore] Hello MySQL!! | 9 | +----+-----------------------------------+----------------+ 5 rows in set (0.00 sec)
mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0); Query OK, 1 row affected (0.03 sec) mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9); Query OK, 1 row affected (0.03 sec) mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0); Query OK, 1 row affected (0.02 sec)
2022-05-04 03:08:23.363 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:08:23.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:08:23.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:08:23.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:08:23.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:08:23.368 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'id' to property 'id' of type 'java.lang.Integer' 2022-05-04 03:08:23.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String' 2022-05-04 03:08:23.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer' 2022-05-04 03:08:23.370 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:08:23.370 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)] 2022-05-04 03:08:23.372 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 6, message_text = Spring Integration JDBC Support, process_status = 0 2022-05-04 03:08:23.372 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 8, message_text = Hello MySQL Connector/J, process_status = 0 2022-05-04 03:08:23.372 INFO 69669 --- [ scheduling-1] o.l.s.integration.JdbcPollingConfig : process message: id = 10, message_text = JDBCでデータベースポーリング, process_status = 0 2022-05-04 03:08:23.372 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:08:23.372 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:08:23.462 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@229257291 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
2022-05-04 03:08:33.364 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$559/0x0000000800f94200.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:08:33.365 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] for JDBC transaction 2022-05-04 03:08:33.366 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] to manual commit 2022-05-04 03:08:33.367 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:08:33.367 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:08:33.368 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:08:33.368 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] 2022-05-04 03:08:33.369 DEBUG 69669 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@1168361095 wrapping com.mysql.cj.jdbc.ConnectionImpl@280b26fb] after transaction
mysql> select * from polling_message; +----+--------------------------------------------+----------------+ | id | message_text | process_status | +----+--------------------------------------------+----------------+ | 1 | Hello World!! | 1 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 1 | | 4 | Hello Spring Boot | 1 | | 5 | [ignore] Hello MySQL!! | 9 | | 6 | Spring Integration JDBC Support | 1 | | 7 | [ignore] Hello InnoDB | 9 | | 8 | Hello MySQL Connector/J | 1 | | 9 | [ignore] はじめてのSpring Integration | 9 | | 10 | JDBCでデータベースポーリング | 1 | +----+--------------------------------------------+----------------+ 10 rows in set (0.00 sec)
Outbound Channel Adapterを使います。
JDBC Support / Outbound Channel Adapter
create table processed_message_log( log_id integer auto_increment, message_text text, primary key(log_id) );
// @Bean public IntegrationFlow stdout() {
@Bean public MessageHandler jdbcMessageHandler(DataSource dataSource) { JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler( dataSource, """ insert into processed_message_log(message_text) values(:payload.messageText) """ ); return jdbcMessageHandler; } @Bean public IntegrationFlow writeTable() { return IntegrationFlows .from("jdbcPollingChannel") .handle(jdbcMessageHandler(null)) .get(); }
""" insert into processed_message_log(message_text) values(:payload.messageText) """
Message (Spring Framework 5.3.19 API)
@Bean public MessageSource<Object> jdbcMessageSource(DataSource dataSource) { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ ); jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)"); jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class)); return jdbcPollingChannelAdapter; }
@Bean public IntegrationFlow writeTable() { return IntegrationFlows .from("jdbcPollingChannel") .handle(jdbcMessageHandler(null)) .get(); }
drop table if exists polling_message; create table polling_message( id integer auto_increment, message_text text, process_status integer, primary key(id) ); drop table if exists processed_message_log; create table processed_message_log( log_id integer auto_increment, message_text text, primary key(log_id) );
$ mvn package $ java -jar target/integration-jdbc-polling-0.0.1-SNAPSHOT.jar
2022-05-04 03:18:33.300 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:18:33.301 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction 2022-05-04 03:18:33.303 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit 2022-05-04 03:18:33.308 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:18:33.308 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:18:33.310 INFO 70288 --- [ main] org.littlewings.spring.integration.App : Started App in 3.268 seconds (JVM running for 3.712) 2022-05-04 03:18:33.348 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'id' to property 'id' of type 'java.lang.Integer' 2022-05-04 03:18:33.350 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String' 2022-05-04 03:18:33.350 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer' 2022-05-04 03:18:33.357 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:18:33.358 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)] 2022-05-04 03:18:33.371 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:18:33.372 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:18:33.373 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates 2022-05-04 03:18:33.389 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:18:33.390 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] 2022-05-04 03:18:33.423 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction 2022-05-04 03:18:43.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:18:43.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction 2022-05-04 03:18:43.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit 2022-05-04 03:18:43.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:18:43.297 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:18:43.299 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:18:43.299 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] 2022-05-04 03:18:43.300 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@949481648 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
2022-05-04 03:18:33.371 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:18:33.372 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:18:33.373 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates
2022-05-04 03:18:33.389 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:18:33.390 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@487352537 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26]
mysql> select * from polling_message; +----+-----------------------------------+----------------+ | id | message_text | process_status | +----+-----------------------------------+----------------+ | 1 | Hello World!! | 0 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 0 | | 4 | Hello Spring Boot | 0 | | 5 | [ignore] Hello MySQL!! | 9 | +----+-----------------------------------+----------------+ 5 rows in set (0.00 sec)
mysql> select * from polling_message; +----+-----------------------------------+----------------+ | id | message_text | process_status | +----+-----------------------------------+----------------+ | 1 | Hello World!! | 1 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 1 | | 4 | Hello Spring Boot | 1 | | 5 | [ignore] Hello MySQL!! | 9 | +----+-----------------------------------+----------------+ 5 rows in set (0.00 sec)
mysql> select * from processed_message_log; +--------+--------------------------+ | log_id | message_text | +--------+--------------------------+ | 1 | Hello World!! | | 2 | Hello Spring Integration | | 3 | Hello Spring Boot | +--------+--------------------------+ 3 rows in set (0.00 sec)
mysql> insert into polling_message(message_text, process_status) values('Spring Integration JDBC Support', 0); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('[ignore] Hello InnoDB', 9); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('Hello MySQL Connector/J', 0); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('[ignore] はじめてのSpring Integration', 9); Query OK, 1 row affected (0.02 sec) mysql> insert into polling_message(message_text, process_status) values('JDBCでデータベースポーリング', 0); Query OK, 1 row affected (0.02 sec)
2022-05-04 03:24:03.294 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:24:03.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction 2022-05-04 03:24:03.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit 2022-05-04 03:24:03.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:24:03.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:24:03.298 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'id' to property 'id' of type 'java.lang.Integer' 2022-05-04 03:24:03.298 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'message_text' to property 'messageText' of type 'java.lang.String' 2022-05-04 03:24:03.299 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.BeanPropertyRowMapper : Mapping column 'process_status' to property 'processStatus' of type 'java.lang.Integer' 2022-05-04 03:24:03.300 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL update 2022-05-04 03:24:03.300 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [update polling_message set process_status = 1 where id in (?, ?, ?)] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates 2022-05-04 03:24:03.306 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:24:03.306 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] 2022-05-04 03:24:03.662 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@766285886 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction 2022-05-04 03:24:13.294 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$558/0x0000000800f90000.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT 2022-05-04 03:24:13.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Acquired Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] for JDBC transaction 2022-05-04 03:24:13.295 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Switching JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] to manual commit 2022-05-04 03:24:13.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL query 2022-05-04 03:24:13.296 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [select id, message_text, process_status from polling_message where process_status = 0 ] 2022-05-04 03:24:13.297 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Initiating transaction commit 2022-05-04 03:24:13.297 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Committing JDBC transaction on Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] 2022-05-04 03:24:13.298 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcTransactionManager : Releasing JDBC Connection [HikariProxyConnection@2037769397 wrapping com.mysql.cj.jdbc.ConnectionImpl@5b721b26] after transaction
2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing SQL batch update [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.core.JdbcTemplate : Executing prepared SQL statement [insert into processed_message_log(message_text) values(?) ] 2022-05-04 03:24:03.302 DEBUG 70288 --- [ scheduling-1] o.s.jdbc.support.JdbcUtils : JDBC driver supports batch updates
mysql> select * from polling_message; +----+--------------------------------------------+----------------+ | id | message_text | process_status | +----+--------------------------------------------+----------------+ | 1 | Hello World!! | 1 | | 2 | [ignore] こんにちは、世界 | 9 | | 3 | Hello Spring Integration | 1 | | 4 | Hello Spring Boot | 1 | | 5 | [ignore] Hello MySQL!! | 9 | | 6 | Spring Integration JDBC Support | 1 | | 7 | [ignore] Hello InnoDB | 9 | | 8 | Hello MySQL Connector/J | 1 | | 9 | [ignore] はじめてのSpring Integration | 9 | | 10 | JDBCでデータベースポーリング | 1 | +----+--------------------------------------------+----------------+ 10 rows in set (0.00 sec) mysql> select * from processed_message_log; +--------+------------------------------------------+ | log_id | message_text | +--------+------------------------------------------+ | 1 | Hello World!! | | 2 | Hello Spring Integration | | 3 | Hello Spring Boot | | 4 | Spring Integration JDBC Support | | 5 | Hello MySQL Connector/J | | 6 | JDBCでデータベースポーリング | +--------+------------------------------------------+ 6 rows in set (0.00 sec)
これでOutput Channel Adapterの方も確認できました。
Spring IntegrationのJDBCサポートを使って、データベースポーリングを試してみました。
JDBCサポートに関する機能を、Java DSLで表現するところでやや手こずりましたが、それがわかるとけっこうあっさりと動かせましたね。
最後に、フロー定義していたクラスの全体を載せておきます。Output Channel Adapterを有効にしているバージョンにしておきます。
package org.littlewings.spring.integration; import java.time.Duration; import java.util.List; import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.jdbc.JdbcMessageHandler; import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.messaging.MessageHandler; @Configuration public class JdbcPollingConfig { @Bean public MessageSource<Object> jdbcMessageSource(DataSource dataSource) { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter( dataSource, """ select id, message_text, process_status from polling_message where process_status = 0 """ ); jdbcPollingChannelAdapter.setUpdateSql("update polling_message set process_status = 1 where id in (:id)"); jdbcPollingChannelAdapter.setRowMapper(new BeanPropertyRowMapper<>(PollingMessage.class)); return jdbcPollingChannelAdapter; } @Bean public IntegrationFlow messagePolling() { return IntegrationFlows .from( jdbcMessageSource(null), c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10L)).transactional()) ) .channel("jdbcPollingChannel") .get(); } // @Bean public IntegrationFlow stdout() { Logger logger = LoggerFactory.getLogger(JdbcPollingConfig.class); return IntegrationFlows .from("jdbcPollingChannel") .handle(message -> { List<PollingMessage> pollingMessages = (List<PollingMessage>) message.getPayload(); pollingMessages .forEach( m -> { // Thread.dumpStack(); logger.info( "process message: id = {}, message_text = {}, process_status = {}", m.getId(), m.getMessageText(), m.getProcessStatus() ); // throw new RuntimeException("oops"); } ); }) .get(); } @Bean public MessageHandler jdbcMessageHandler(DataSource dataSource) { JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler( dataSource, """ insert into processed_message_log(message_text) values(:payload.messageText) """ ); return jdbcMessageHandler; } @Bean public IntegrationFlow writeTable() { return IntegrationFlows .from("jdbcPollingChannel") .handle(jdbcMessageHandler(null)) .get(); } }