CLOVER馃崁

That was when it all began.

Spring WebFlux銇roxy銈点兗銉愩兗銈掓浉銇勩仸銇裤倠

銇°倗銇c仺銇椼仧銇婇銇屻亗銈娿伨銇椼仸銆丼pring WebFlux銇Proxy銈点兗銉銉笺倰鏇搞亜銇︺伩銇俱仐銇熴

銇婇銇併亾銈撱仾鎰熴仒銇с

  • Reverse Proxy
  • 绶┿亜Proxy銈点兗銉銉硷紙X-Forwarded銆溿仺銇嬨伅姘椼伀銇椼仾銇勶級
  • 銇汇伡銇伀銈傝冦亪銇氥併儛銉冦偗銈ㄣ兂銉夈伕銇儶銈銈ㄣ偣銉堛仺銉偣銉濄兂銈广伄鍐呭銈掕虎閫併仚銈
  • GET銇≒OST銈掋仺銈娿亗銇堛仛瀵惧繙

Spring WebFlux锛媁ebClient銇с併儙銉銉栥儹銉冦偔銉炽偘銇浉銇勩仸銇裤倛銇嗐仺銇勩亞瑭便仹銇欍

Web on Reactive Stack

Proxy銈点兗銉銉笺伄銈广儦銉冦偗銇ㄣ仐銇︺併仢銈屻仾銈娿伀闋戝嫉銈嶃亞銇ㄦ濄仯銇熴倝Jetty銇傘仧銈娿倰鍙傝冦伀銇欍倠銇ㄣ倛銇勩伄銇с伅銇亜銇с仐銈囥亞銇嬨
https://github.com/eclipse/jetty.project/blob/jetty-9.4.9.v20180320/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java
https://github.com/eclipse/jetty.project/blob/jetty-9.4.9.v20180320/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AbstractProxyServlet.java

鐠板

浠婂洖銇嫊浣滅挵澧冦伅銆併亾銈撱仾鎰熴仒銇с

$ java -version
openjdk version "1.8.0_162"
OpenJDK Runtime Environment (build 1.8.0_162-8u162-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.162-b12, mixed mode)

$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: /usr/local/maven3/current
Java version: 1.8.0_162, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-104-generic", arch: "amd64", family: "unix"

銉愩儍銈偍銉炽儔銇偟銉銉愩兗

銉椼儹銈偡鍏堛伄銉愩儍銈偍銉炽儔銇偟銉銉愩兗銇佺啊鍗樸伀Servlet銇ф浉銇忋亾銇ㄣ伀銇椼伨銇欍

Maven渚濆瓨闁總銆

        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>

鍙椼亼鍙栥仯銇熴儶銈銈ㄣ偣銉堛伄鍐呭銈掋丅ody銇浉銇嶅嚭銇欍倛銇嗐伀銇椼伨銇欍
src/main/java/org/littlewings/servlet/SimpleServlet.java

package org.littlewings.servlet;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet("/*")
public class SimpleServlet extends HttpServlet {
    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
        execute(request, response);
    }

    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
        execute(request, response);
    }

    void execute(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
        request.setCharacterEncoding("UTF-8");

        response.setContentType("text/plain");
        response.setCharacterEncoding("UTF-8");

        response.addHeader("X-Custom-Header", "Header-Value");

        PrintWriter writer = response.getWriter();

        writer.println("========================================");
        writer.println("Request Method:");
        writer.println("  " + request.getMethod());

        writer.println("========================================");
        writer.println("Request URI:");
        String queryString = request.getQueryString();
        if (queryString != null) {
            writer.println("  " + request.getRequestURI() + "?" + queryString);
        } else {
            writer.println("  " + request.getRequestURI());
        }

        writer.println("========================================");
        writer.println("Request Headers:");
        Collections.list(request.getHeaderNames()).forEach(name -> {
            List<String> values = Collections.list(request.getHeaders(name));
            writer.println("  " + name + ":" + values);
        });

        writer.println("========================================");
        writer.println("Request Body:");
        BufferedReader reader = request.getReader();
        int c;
        while ((c = reader.read()) != -1) {
            writer.print((char)c);
        }
    }
}

銇傘仺銆併伈銇ㄣ仱銇犮亼銆佺嫭鑷伄銉樸儍銉銉笺倰鍏ャ倢銇︺亜銇熴倞銆

        response.addHeader("X-Custom-Header", "Header-Value");

銇俱亗銆併亜銇氥倢銈傘偗銉┿偆銈€兂銉堛亱銈夈伄銉偗銈ㄣ偣銉堛伄鍐呭銇屽埌閬斻仐銇︺亜銈嬨亱銆併偗銉┿偆銈€兂銉堛伕銉愩儍銈偍銉炽儔銇偟銉銉愩兗
銉偣銉濄兂銈广亴杌㈤併仹銇嶃仸銇勩倠銇嬨併仺銇勩亞纰鸿獚銇唴瀹广仹銇欍伃銆

銇撱倢銈掋儜銉冦偙銉笺偢銉炽偘銇椼仸銆佷粖鍥炪伅Apache Tomcat 9.0.7銇儑銉椼儹銈ゃ仐銇﹁捣鍕曘仌銇涖仸銇娿亶銇俱仚銆

銇撱伄寰屼綔鎴愩仚銈嬨Proxy銈点兗銉銉笺仺鍚屻仒銉涖偣銉堛仹璧峰嫊銇欍倠銇仹銆佸埄鐢ㄣ仚銈嬨儩銉笺儓銇18080銇ㄣ仐銇︺亰銇嶃伨銇椼仧銆

婧栧倷

銇с伅銆丼pring WebFlux銇т綔銈Proxy銈点兗銉銉笺伄婧栧倷銈掋

Maven渚濆瓨闁總銇併亾銇°倝銆

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.1.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
    </dependencies>

Proxy Controller

Proxy銇ㄣ仐銇︽尟銈嬭垶銇嗐儣銉偘銉┿儬銇丷estController銇ㄣ仐銇︿綔鎴愩仐銇俱仐銇熴

src/main/java/org/littlewings/spring/webflux/proxy/ProxyController.java

package org.littlewings.spring.webflux.proxy;

import java.net.ConnectException;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class ProxyController {
    @RequestMapping(value = "/**", method = {RequestMethod.GET, RequestMethod.POST})
    public Flux<DataBuffer> proxy(ServerWebExchange exchange) {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();

        String remoteHost = "localhost";
        int remotePort = 18080;

        WebClient proxyClient =
                WebClient
                        .builder()
                        .baseUrl(String.format("http://%s:%d", remoteHost, remotePort))
                        .build();

        Mono<ClientResponse> monoRemoteResponse =
                proxyClient
                        .method(request.getMethod())
                        .uri(uriBuilder ->
                                uriBuilder
                                        .path(request.getPath().value())
                                        .queryParams(request.getQueryParams())
                                        .build()
                        )
                        .headers(headers -> {
                            HttpHeaders requestHeaders = request.getHeaders();
                            requestHeaders.forEach(headers::addAll);
                        })
                        .body(request.getBody(), DataBuffer.class)
                        .exchange();

        Flux<ClientResponse> remoteResponse =
                Flux.from(monoRemoteResponse);

        return remoteResponse
                .flatMap(remoteClientResponse -> {
                    response.setStatusCode(remoteClientResponse.statusCode());
                    ClientResponse.Headers remoteHeaders = remoteClientResponse.headers();
                    remoteHeaders.asHttpHeaders().forEach((name, values) -> {
                        if ("Content-Type".equalsIgnoreCase(name)) {
                            // 銉囥儠銈┿儷銉堛伄銆宼ext/event-stream銆嶃倰娼般仚
                            response.getHeaders().set(name, values.get(0));
                        } else {
                            response.getHeaders().addAll(name, values);
                        }
                    });

                    return remoteClientResponse.bodyToFlux(DataBuffer.class);
                })
                .doOnError(throwable -> {
                    if (throwable.getCause() != null && throwable.getCause() instanceof ConnectException) {
                        response.setStatusCode(HttpStatus.BAD_GATEWAY);
                        response.getHeaders().set("Content-Type", "text/plain");
                    } else {
                        response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
                        response.getHeaders().set("Content-Type", "text/plain");
                    }
                })
                .onErrorReturn(response.bufferFactory().wrap(new byte[0]));
    }
}

銉°偨銉冦儔銇ㄣ仐銇︺伅銆丂RequestMapping銇value銈掋/**銆嶃仺銇椼仸銇┿伄銉戙偣銇с倐鍙椼亼浠樸亼銈嬨倛銇嗐伀銇椼仸銆佸紩鏁般伅ServerWebExchange銆佹埢銈婂ゃ伅Flux銇ㄣ亜銇
鎰熴仒銇с

    @RequestMapping(value = "/**", method = {RequestMethod.GET, RequestMethod.POST})
    public Flux<DataBuffer> proxy(ServerWebExchange exchange) {

銈点兗銉愩兗銇ㄣ仐銇︿娇銇嗐儶銈銈ㄣ偣銉堛併儸銈广儩銉炽偣銇疭erverWebExchange銇嬨倝銇濄倢銇炪倢ServerHttpRequest銆丼erverHttpResponse銇ㄣ仐銇﹀彇寰椼仚銈嬨亾銇ㄣ亴銇с亶銇俱仚銆

        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();

銉偗銈ㄣ偣銉堣虎閫佸厛銇儧銈广儓銆併儩銉笺儓銈掋儥銉笺偣銇仐銇ebClient銈掍綔鎴愩仐銇俱仚銆

        String remoteHost = "localhost";
        int remotePort = 18080;

        WebClient proxyClient =
                WebClient
                        .builder()
                        .baseUrl(String.format("http://%s:%d", remoteHost, remotePort))
                        .build();

銇濄倢銇嬨倝銆丼erverHttpRequest銇唴瀹广倰WebClient銇岄佷俊銇欍倠銉偗銈ㄣ偣銉堛伄鍐呭銇偝銉斻兗銇椼仸銇勩亶銇俱仚銆傘儑銉笺偪銇佺壒銇亾銇撱仹澶夋洿銇椼仧銈娿仐銇亜銇仹
DataBuffer銇ㄣ仐銇﹀彈銇戝彇銈嬨倛銇嗐伀浣滄垚銆

        Mono<ClientResponse> monoRemoteResponse =
                proxyClient
                        .method(request.getMethod())
                        .uri(uriBuilder ->
                                uriBuilder
                                        .path(request.getPath().value())
                                        .queryParams(request.getQueryParams())
                                        .build()
                        )
                        .headers(headers -> {
                            HttpHeaders requestHeaders = request.getHeaders();
                            requestHeaders.forEach(headers::addAll);
                        })
                        .body(request.getBody(), DataBuffer.class)
                        .exchange();

WebClient銇娇銇勬柟銇併亾銇撱倰瑕嬨仱銇も︺
WebClient

銇撱亾銇ц繑銇c仸銇忋倠銇伅Mono銇伄銇с仚銇屻併仢銈屻仾銈娿伀銉囥兗銈裤亴澶с亶銇勩偙銉笺偣銈傝冦亪銈嬨仺Flux銇仐銇熸柟銇屻亜銇勩伄銇с伅銇ㄣ亜銇嗘皸銇屻仐銇熴伄銇с
Mono銈扚lux銇仐銇︺亰銇嶃伨銇欍

        Flux<ClientResponse> remoteResponse =
                Flux.from(monoRemoteResponse);

銇傘仺銇併儸銈广儩銉炽偣銇唴瀹广倰銈儵銈ゃ偄銉炽儓銇繑銇欍倛銇嗐伀浣滄垚銆

        return remoteResponse
                .flatMap(remoteClientResponse -> {
                    response.setStatusCode(remoteClientResponse.statusCode());
                    ClientResponse.Headers remoteHeaders = remoteClientResponse.headers();
                    remoteHeaders.asHttpHeaders().forEach((name, values) -> {
                        if ("Content-Type".equalsIgnoreCase(name)) {
                            // 銉囥儠銈┿儷銉堛伄銆宼ext/event-stream銆嶃倰娼般仚
                            response.getHeaders().set(name, values.get(0));
                        } else {
                            response.getHeaders().addAll(name, values);
                        }
                    });

                    return remoteClientResponse.bodyToFlux(DataBuffer.class);
                })
                .doOnError(throwable -> {
                    if (throwable.getCause() != null && throwable.getCause() instanceof ConnectException) {
                        response.setStatusCode(HttpStatus.BAD_GATEWAY);
                        response.getHeaders().set("Content-Type", "text/plain");
                    } else {
                        response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
                        response.getHeaders().set("Content-Type", "text/plain");
                    }
                })
                .onErrorReturn(response.bufferFactory().wrap(new byte[0]));

銉樸儍銉銉笺伀銇ゃ亜銇︺伅銆併孋ontent-Type銆嶃伄銇縮et銇欍倠銈堛亞銇仐銇︺併亗銇ㄣ伅銇层仧銇欍倝addAll銆

                    response.setStatusCode(remoteClientResponse.statusCode());
                    ClientResponse.Headers remoteHeaders = remoteClientResponse.headers();
                    remoteHeaders.asHttpHeaders().forEach((name, values) -> {
                        if ("Content-Type".equalsIgnoreCase(name)) {
                            // 銉囥儠銈┿儷銉堛伄銆宼ext/event-stream銆嶃倰娼般仚
                            response.getHeaders().set(name, values.get(0));
                        } else {
                            response.getHeaders().addAll(name, values);
                        }
                    });

銉囥兗銈裤伅銆丏ataBuffer銇瓼lux銇ㄣ仐銇﹁繑銇欍倛銇嗐伀銇椼伨銇椼仧銆

                    return remoteClientResponse.bodyToFlux(DataBuffer.class);

銈ㄣ儵銉笺偙銉笺偣銇仱銇勩仸銇佺啊鍗樸伀娓堛伨銇涖仸銇勩伨銇欌︺

璧峰嫊銈儵銈

Spring Boot銈€儣銉偙銉笺偡銉с兂銇ㄣ仐銇︺伄璧峰嫊銈儵銈广伅銆併亾銈撱仾鎰熴仒銇х啊鍗樸伀銆
src/main/java/org/littlewings/spring/webflux/proxy/App.java

package org.littlewings.spring.webflux.proxy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

纰鸿獚

銇濄倢銇с伅銆佺⒑瑾嶃仐銇︺伩銇俱仐銈囥亞銆

Proxy銇ㄣ仐銇︿綔鎴愩仐銇焀ebFlux銈掍娇銇c仧銈€儣銉偙銉笺偡銉с兂銆佹渶鍒濄伀浣溿仯銇Servlet銈掋儑銉椼儹銈ゃ仐銇Apache Tomcat銇捣鍕曟笀銇裤仺銇椼伨銇欍

GET銆

$ curl -i -H 'X-Test-Header: sample1' http://localhost:8080/path1/path2?query-param=value
HTTP/1.1 200 OK
Content-Type: text/plain;charset=UTF-8
X-Custom-Header: Header-Value
Content-Length: 380
Date: Sun, 08 Apr 2018 12:13:59 GMT

========================================
Request Method:
  GET
========================================
Request URI:
  /path1/path2?query-param=value
========================================
Request Headers:
  accept-encoding:[gzip]
  host:[localhost:8080]
  user-agent:[curl/7.47.0]
  accept:[*/*]
  x-test-header:[sample1]
========================================
Request Body:

POST銆

$ curl -i -H 'X-Test-Header: sample1' -H 'Content-Type: application/json' http://localhost:8080/path1/path2?query-param=value -d '{"param": "json-value"}'
HTTP/1.1 200 OK
Content-Type: text/plain;charset=UTF-8
X-Custom-Header: Header-Value
Content-Length: 460
Date: Sun, 08 Apr 2018 12:16:18 GMT

========================================
Request Method:
  POST
========================================
Request URI:
  /path1/path2?query-param=value
========================================
Request Headers:
  accept-encoding:[gzip]
  host:[localhost:8080]
  user-agent:[curl/7.47.0]
  accept:[*/*]
  x-test-header:[sample1]
  content-type:[application/json]
  content-length:[23]
========================================
Request Body:
{"param": "json-value"}

鍕曘亜銇︺仢銇嗐仹銇欍伃銆

銇俱仺銈

Spring WebFlux锛媁ebClient銈掍娇銇c仸銆佺啊鍗樸仾Proxy銈点兗銉銉笺倰鏇搞亜銇︺伩銇俱仐銇熴

瀹熴伅Spring WebFlux锛圫pring Boot 2锛Spring Framework 5锛夈倰浣裤亞銇伅鍒濄倎銇︺仩銇c仧銇仹銆併亼銇c亾銇嗗媺寮枫伀銇倞銇俱仐銇熴

銈優銈

鏈鍒濄併亾銇亰椤屻倰Reactor Netty鍗樹綋銇с倓銈嶃亞銇ㄣ仐銇熴伄銇с仚銇屻佽浜嬨伀鎸姌銇椼伨銇椼仧鈥︺

Handler銈掋仼銇嗘浉銇勩仧銈夈亜銇勩伄銇嬨偆銉炪偆銉併倛銇忓垎銇嬨倝銇亸銇︺佹墦闁嬬瓥銇ㄣ仐銇pring WebFlux銇Щ銇c仧銈夈伨銇傘亗銇c仌銈娿仺
銇嗐伨銇忋亜銇嶃伨銇椼仸鈥︺

杩姐亞銇倝銆併亾銇亗銇熴倞銇伄銇с仐銈囥亞銇
https://github.com/spring-projects/spring-framework/blob/v5.0.5.RELEASE/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java
https://github.com/spring-projects/spring-framework/blob/v5.0.5.RELEASE/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java

鍙傝冿級
はじめてのSpring WebFlux (その1.5 - Spring Bootを使わずSpring WebFluxをマニュアルでBootstrap)