CLOVER🍀

That was when it all began.

Spring IntegrationのGatewayを試す

これは、なにをしたくて書いたもの?

少し前に、Spring Integrationを試してみました。

Spring Integrationを試してみる - CLOVER🍀

他にドキュメントを読んでいて、Gatewayを使ってみないと全体のイメージが掴めない気がするので、1回試してみようかなと。

Gateway

Gatewayは、Overview内にも少し出てきます。request - replyな形式をサポートするようです。

request-reply MessagingGatewaySupport implementations (such as AmqpInboundGateway and AbstractWebServiceInboundGateway).

Spring Integration Overview / Finding Class Names for Java and DSL Configuration

もっと詳細に書かれている、Messaging Gatewayに関するドキュメントはこちら。

Messaging Endpoints / Messaging Gateways

Gatewayは、Spring Integrationによって提供するメッセージングAPIを隠するものだとされています。

A gateway hides the messaging API provided by Spring Integration. It lets your application’s business logic be unaware of the Spring Integration API.

こちらを見ると、インターフェースに対するプロキシを生成し、Spring Integrationを内部で呼び出す仕組みになるようです。

Spring Integration provides the GatewayProxyFactoryBean, which generates a proxy for any interface and internally invokes the gateway methods shown below. By using dependency injection, you can then expose the interface to your business methods.

Messaging Endpoints / Messaging Gateways / Enter the GatewayProxyFactoryBean

Messaging Endpoints / Messaging Gateways / Gateway XML Namespace Support

こうすると、Spring Integrationが使われていることを呼び出し元が意識しなくなる、ということですね。

With this configuration defined, the cafeService can now be injected into other beans, and the code that invokes the methods on that proxied instance of the Cafe interface has no awareness of the Spring Integration API. The general approach is similar to that of Spring Remoting (RMI, HttpInvoker, and so on).

よく似たアプローチを取っているものとして、RMIなどが挙げられています。

このあたりを見ると、裏では(Reply用の)Channelを使ったSpring Integrationの仕組みが動作するようですね。

Typically, you need not specify the default-reply-channel, since a Gateway auto-creates a temporary, anonymous reply channel, where it listens for the reply. However, some cases may prompt you to define a default-reply-channel (or reply-channel with adapter gateways, such as HTTP, JMS, and others).

Messaging Endpoints / Messaging Gateways / Setting the Default Reply Channel

内部的な動作が説明されているので、ちょっと見てみましょう。

  • Gatewayは、一時的なpoint-to-pointなリプライ用のChannelを作成する
    • 匿名であり、メッセージヘッダーが追加される
  • 明示的なdefault-reply-channelを指定する場合(Remote Adapter Gatewayと一緒にreply-channelを指定する場合)は、publish-subscribe Channelを指定できる
    • この場合、複数のSubscriberを指定可能
    • Spring Integrationは、一時的なreply-channelと明示的なdefault-reply-channelとの間にブリッジを作成する

For some background, we briefly discuss some of the inner workings of the gateway. A gateway creates a temporary point-to-point reply channel. It is anonymous and is added to the message headers with the name, replyChannel. When providing an explicit default-reply-channel (reply-channel with remote adapter gateways), you can point to a publish-subscribe channel, which is so named because you can add more than one subscriber to it. Internally, Spring Integration creates a bridge between the temporary replyChannel and the explicitly defined default-reply-channel.

また、リプライをGatewayだけでなく他のConsumerに送信したい場合は、サブスクライブを行うために明示的な名前付けされたChannelを
作る必要があります。そのChannelは、publish-subscribe-channelとして機能します

Suppose you want your reply to go not only to the gateway but also to some other consumer. In this case, you want two things: - A named channel to which you can subscribe - That channel to be a publish-subscribe-channel

Messaging Endpoints / Messaging Gateways / Setting the Default Reply Channel

と、書いていても掴めないところもあるので、あとは実際にソースコードを書いて動かしてみましょう。

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.3 2022-04-19
OpenJDK Runtime Environment (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 17.0.3+7-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.3, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-109-generic", arch: "amd64", family: "unix"

プロジェクトを作成する

まずは、Spring Bootプロジェクトを作成します。依存関係に、webintegrationを加えました。

$ curl -s https://start.spring.io/starter.tgz \
  -d bootVersion=2.6.7 \
  -d javaVersion=17 \
  -d name=integration-gateway-example \
  -d groupId=org.littlewings \
  -d artifactId=integration-gateway-example \
  -d version=0.0.1-SNAPSHOT \
  -d packageName=org.littlewings.spring.integration \
  -d dependencies=web,integration \
  -d baseDir=integration-gateway-example | tar zxvf -

プロジェクト内に移動。

$ cd integration-gateway-example

デフォルトで生成されるソースコードは削除しておきます。

$ rm src/main/java/org/littlewings/spring/integration/IntegrationGatewayExampleApplication.java src/test/java/org/littlewings/spring/integration/IntegrationGatewayExampleApplicationTests.java

Mavenの依存関係やプラグインの設定を見ておきます。

 <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

Spring Web MVCとSpring Integrationを指定すると、spring-integration-httpもついてくるんですね。

こちらのことですね。

HTTP Support

せっかくなので、こちらも使って簡単なサンプルアプリケーションを作成したいと思います。

とりあえず、mainクラスだけは用意しておきます。

src/main/java/org/littlewings/spring/integration/App.java

package org.littlewings.spring.integration;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

お題

お題は、Echoプログラムにしましょう。

こんな感じのクラスをリクエストとして受け取り、

src/main/java/org/littlewings/spring/integration/RequestMessage.java

package org.littlewings.spring.integration;

public class RequestMessage {
    String request;

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }
}

レスポンスをこちらで返すプログラムを作ることにします。

src/main/java/org/littlewings/spring/integration/ResponseMessage.java

package org.littlewings.spring.integration;

public class ResponseMessage {
    String response;

    public String getResponse() {
        return response;
    }

    public void setResponse(String response) {
        this.response = response;
    }
}

リクエストは、JSONで受け取るようにしましょう。

最初はGatewayをシンプルに使い、次にSpring IntegrationのHTTPサポートを使ったパターンを作っていきます。

Gatewayをシンプルに使う

まずは、Gatewayをシンプルに使ってみます。

こちらを見つつ。

Messaging Endpoints / Messaging Gateways / Gateway Configuration with Annotations and XML

Messaging Endpoints / Messaging Gateways / @MessagingGateway Annotation

@MessagingGatewayアノテーションと、@Gatewayアノテーションを付与したインターフェースを作ればよさそうです。

src/main/java/org/littlewings/spring/integration/simple/EchoService.java

package org.littlewings.spring.integration.simple;

import org.littlewings.spring.integration.RequestMessage;
import org.littlewings.spring.integration.ResponseMessage;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway
public interface EchoService {
    @Gateway(requestChannel = "echoRequestChannel")
    ResponseMessage execute(RequestMessage request);
}

request-channelはechoRequestChannelとしておきます。

GatewayProxyFactoryBeanを使って定義する方法もあるようですが。

Messaging Endpoints / Messaging Gateways / / Enter the GatewayProxyFactoryBean

今回は、定義したEchoServiceインターフェースをDIして使うことにするので、アノテーションを使って定義してコンポーネントスキャンして
もらうことにしましょう。

次に、IntegrationFlowを定義します。

src/main/java/org/littlewings/spring/integration/simple/EchoGatewayConfig.java

package org.littlewings.spring.integration.simple;

import org.littlewings.spring.integration.RequestMessage;
import org.littlewings.spring.integration.ResponseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;

@Configuration
public class EchoGatewayConfig {
    @Bean
    public IntegrationFlow reply() {
        return IntegrationFlows
                .from("echoRequestChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(EchoGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }
}

先ほどのインターフェースで定義した、echoRequestChannelをサブスクライブして処理するGenericHandlerを定義しましょう。

Java DSL / Service Activators and the .handle() method

ヘッダーをログ出力して

                    Logger logger = LoggerFactory.getLogger(EchoGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

リクエストの内容を加工して返すようにします。

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;

では、このインターフェースを使うRestControllerを定義します。

src/main/java/org/littlewings/spring/integration/simple/EchoController.java

package org.littlewings.spring.integration.simple;

import org.littlewings.spring.integration.RequestMessage;
import org.littlewings.spring.integration.ResponseMessage;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EchoController {
    EchoService echoService;

    public EchoController(EchoService echoService) {
        this.echoService = echoService;
    }

    @PostMapping("echo")
    public ResponseMessage echo(@RequestBody RequestMessage request) {
        return echoService.execute(request);
    }
}

では、動かしてみましょう。

$ mvn spring-boot:run

確認。

$ curl -H 'Content-Type: application/json' localhost:8080/echo -d '{"request": "Hello World"}'
{"response":"★★★ Hello World ★★★"}

RestControllerで受け取った内容が

    @PostMapping("echo")
    public ResponseMessage echo(@RequestBody RequestMessage request) {
        return echoService.execute(request);
    }

こちらで定義したGenericHandlerでハンドリングされているのがわかりますね。

    @Bean
    public IntegrationFlow reply() {
        return IntegrationFlows
                .from("echoRequestChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(EchoGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }

ヘッダーは、こんな感じでログ出力されました。

2022-05-01 20:51:39.608  INFO 21766 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = replyChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@b9c6c95
2022-05-01 20:51:39.609  INFO 21766 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = errorChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@b9c6c95
2022-05-01 20:51:39.609  INFO 21766 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = id, value = 4834b4ac-ac57-0d4a-5f15-2631733b86d5
2022-05-01 20:51:39.609  INFO 21766 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = timestamp, value = 1651405899607

というわけで、RestControllerから見た時はふつうのインターフェースなのに、裏ではSpring Integrationが動作しているという仕掛けが
Gatewayだということが確認できました。

@RestController
public class EchoController {
    EchoService echoService;

    public EchoController(EchoService echoService) {
        this.echoService = echoService;
    }

    @PostMapping("echo")
    public ResponseMessage echo(@RequestBody RequestMessage request) {
        return echoService.execute(request);
    }
}

ここで、1度アプリケーションを停止しておきます。

HTTPサポートを使ってみる

次に、Spring IntegrationのHTTPサポートを使ってみましょう。

HTTP Support

今回は、自分でGatewayは作成しません。HTTPサポートが提供しているHttpRequestHandlingMessagingGatewayを使います。

HTTP Support / Http Inbound Components

RestControllerも作成せず、HttpRequestHandlingMessagingGatewayがリクエストを受け付け、レスポンスを返します。

なお、今回は使いませんがいわゆるHTTPクライアントに相当するHttpRequestExecutingMessageHandlerもあるようです。

HTTP Support / HTTP Outbound Components

では、Configurationを作成します。

src/main/java/org/littlewings/spring/integration/http/HttpGatewayConfig.java

package org.littlewings.spring.integration.http;

import org.littlewings.spring.integration.simple.EchoService;
import org.littlewings.spring.integration.RequestMessage;
import org.littlewings.spring.integration.ResponseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.dsl.Http;

@Configuration
public class HttpGatewayConfig {
    @Bean
    public IntegrationFlow httpInbound() {
        return IntegrationFlows
                .from(
                        Http
                                .inboundGateway("http-gateway")
                                .requestMapping(mapping -> mapping.methods(HttpMethod.POST))
                                .requestPayloadType(RequestMessage.class)
                                .get()
                )
                .channel("httpInboundChannel")
                .get();
    }

    @Bean
    public IntegrationFlow handler() {
        return IntegrationFlows
                .from("httpInboundChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(HttpGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }
}

リクエストを受け付けてレスポンスを返すGatewayは、この部分ですね。

    @Bean
    public IntegrationFlow httpInbound() {
        return IntegrationFlows
                .from(
                        Http
                                .inboundGateway("http-gateway")
                                .requestMapping(mapping -> mapping.methods(HttpMethod.POST))
                                .requestPayloadType(RequestMessage.class)
                                .get()
                )
                .channel("httpInboundChannel")
                .get();
    }

http-gatewayというパスにマッピングすることにしました。

HttpRequestHandlingMessagingGatewayは、Java DSLHttpというクラスを使って組み立てています。

Http (Spring Integration 5.5.11 API)

HTTP Support / Configuring HTTP Endpoints with Java

リクエストの内容を扱っている部分は、先ほどのシンプルにGatewayを作成した場合と同じように(GenericHandler)作成しました。

    @Bean
    public IntegrationFlow handler() {
        return IntegrationFlows
                .from("httpInboundChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(HttpGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }

再度起動。

$ mvn spring-boot:run

確認。

$ curl -H 'Content-Type: application/json' localhost:8080/http-gateway -d '{"request": "Hello World"}'
{"response":"★★★ Hello World ★★★"}

先ほどと同じ結果になりましたね。

ログ出力されたヘッダーの情報。

2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = content-length, value = 26
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = http_requestMethod, value = POST
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = replyChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@95d3903
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = errorChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@95d3903
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = host, value = localhost:8080
2022-05-01 21:22:46.391  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = http_requestUrl, value = http://localhost:8080/http-gateway
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = id, value = 0ee3cfdb-b4e3-7500-d571-8f39e973fee1
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = contentType, value = application/json;charset=UTF-8
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = user-agent, value = curl/7.68.0
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = accept, value = */*
2022-05-01 21:22:46.392  INFO 21766 --- [nio-8080-exec-3] o.l.s.i.http.HttpGatewayConfig           : header: name = timestamp, value = 1651407766391

最後に少し捻りで、こちらのGenericHandlerを使った定義をコメントアウトして

    //@Bean
    public IntegrationFlow handler() {
        return IntegrationFlows
                .from("httpInboundChannel")
                .<RequestMessage>handle((payload, header) -> {
                    Logger logger = LoggerFactory.getLogger(HttpGatewayConfig.class);
                    header.entrySet().forEach(entry -> logger.info("header: name = {}, value = {}", entry.getKey(), entry.getValue()));

                    String message = payload.getRequest();
                    ResponseMessage response = new ResponseMessage();
                    response.setResponse("★★★ " + message + " ★★★");
                    return response;
                })
                .get();
    }

先ほど作成したインターフェースとGatewayを使ってみます。

    @Bean
    public IntegrationFlow delegate(EchoService echoService) {
        return IntegrationFlows
                .from("httpInboundChannel")
                .handle(echoService, "execute")
                .get();
    }

アプリケーションを起動。

$ mvn spring-boot:run

先ほどと同じ結果になりました。

$ curl -H 'Content-Type: application/json' localhost:8080/http-gateway -d '{"request": "Hello World"}'
{"response":"★★★ Hello World ★★★"}

ログは、最初に作成したConfigurationでのログになっていますね。

2022-05-01 21:34:47.562  INFO 24136 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = replyChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@56354fec
2022-05-01 21:34:47.563  INFO 24136 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = errorChannel, value = org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@56354fec
2022-05-01 21:34:47.564  INFO 24136 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = id, value = e710a00d-3ea4-1f55-38e9-8421c2fcec99
2022-05-01 21:34:47.564  INFO 24136 --- [nio-8080-exec-1] o.l.s.i.simple.EchoGatewayConfig         : header: name = timestamp, value = 1651408487562

とりあえず、こんなところでしょうか。

まとめ

Spring IntegrationのGatewayを試してみました。

リクエスト、レスポンスを実現できるものだということは前回ドキュメントを見て雰囲気はわかっていましたが、いざ使おうとするとなかなか
入り方がわからなくて苦労しました…。

とりあえず、自分でGatewayを作ってみるのと、すでに実装済みのGatewayを使ってみるパターンはできたのでよしとしましょう。