CLOVER🍀

That was when it all began.

WildFly 35(RESTEasy)でServer-Sent Events(SSE)を試す

これは、なにをしたくて書いたもの?

Jakarta RESTful Web Services(以降JAX-RS)ではServer-Sent Events(SSE)が扱えるようなのですが、そういえば見かけたことは
あるものの試したことがなかったので簡単に使ってみることにしました。

WebSocketは扱っていたんですけどね。

UndertowでWebSocketを使って遊ぶ - CLOVER🍀

Quarkus × WebSocketで遊ぶ - CLOVER🍀

Server-Sent Events(SSE)

そもそもServer-Sent Eventsとは?というところからですが、これはサーバーがクライアントにメッセージをプッシュする
仕組みのことです。

サーバー送信イベント - Web API | MDN

ここで扱うメッセージはイベント、データと呼ばれるようです。

WHATWGによる仕様書はこちらです。

HTML Standard

ServerとSentの間に「-」が入るんですね。

Jakarta RESTful Web ServicesでのServer-Sent Eventsへの対応

Jakarta RESTful Web Services(JAX-RS)では、バージョン2.1からServer-Sent Eventsに対応しているようです。

Jakarta RESTful Web Services / Server-Sent Events

サーバーとクライアントの両方でサポートされています。ブロードキャストもできるようです。

最初のSSEレスポンス以外は少し異なるとされているものの、JAX-RSのパイプラインを通してSSEイベントは送信される
ようです。

The initial SSE response, which may only include the HTTP headers, is processed using the standard JAX-RS pipeline as described in Appendix Processing Pipeline. Each subsequent SSE event may include a different payload and thus require the use of a specific message body writer. Note that since this use case differs slightly from the normal JAX-RS pipeline, implementations SHOULD NOT call entity interceptors on each individual event.

Jakarta RESTful Web Services / Processing Pipeline

JAX-RSのSSEに関するパッケージはこちら。

jakarta.ws.rs.sse (Jakarta RESTful WS API 3.1.0 API)

サーバー側で主に使うAPIはこちら。

このうち、SseJakarta Contexts and Dependency Injection(CDI)を使ったインジェクションが可能です。

SseEventSinkについては@Contextで扱う必要があります。

JAX-RS(Jakarta RESTful Web Services) 3.1.0で、Contextアノテーションの代わりにCDIが推奨されるようになっていたという話 - CLOVER🍀

OutboundSseEventおよびSseEventがSSEにおける「イベント」に相当し、各プロパティの詳しい意味はWHATWG
仕様書を見た方がよいでしょうね。

HTML Standard

SSEを扱う時は、基本的にスレッドプールとセットで利用することになると思います。仕様書内にもExecutorService
影が見えますね。

クライアント側で主に使うAPIはこちら。

JAX-RSの実装であるRESTEasyおよびJerseyのSSEに関するドキュメントはこちら。

RESTEasyに関しては、リアクティブなAPIを使うことで自然な形でSSEに対応できたりします。

今回は、WildFly 35とRESTEasyでSSEを試してみようと思います。

環境

今回の環境はこちら。

$ java --version
openjdk 21.0.6 2025-01-21
OpenJDK Runtime Environment (build 21.0.6+7-Ubuntu-124.04.1)
OpenJDK 64-Bit Server VM (build 21.0.6+7-Ubuntu-124.04.1, mixed mode, sharing)


$ mvn --version
Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 21.0.6, vendor: Ubuntu, runtime: /usr/lib/jvm/java-21-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "6.8.0-53-generic", arch: "amd64", family: "unix"

お題

こんな感じでクライアントはcurl、サーバーはWildFlyとして、単一のクライアントに対してSSEを行うパターンと複数の
クライアントに対してブロードキャストするパターンを試してみたいと思います。

flowchart LR
    A[curl] --> |HTTP/single| W[WildFLy]
    subgraph "Broadcast Group"
      B[curl] --> |HTTP/broadcast| W_dummy
      C[curl] --> |HTTP/broadcast| W_dummy
      D[curl] --> |HTTP/register| W_dummy
    end

    W_dummy(( )) --> W

ブロードキャストするパターンは2つのクライアントがメッセージの受信用で、あとひとつはメッセージの送信用です。
送信されたメッセージを、2つのクライアントがSSEを使って受信します。

JAX-RSでSSEを扱う

では、SSEを扱うアプリケーションを作成します。

Maven依存関係など。packagingwarです。

    <properties>
        <maven.compiler.release>21</maven.compiler.release>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.wildfly.bom</groupId>
                <artifactId>wildfly-ee-with-tools</artifactId>
                <version>35.0.1.Final</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>jakarta.servlet</groupId>
            <artifactId>jakarta.servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>jakarta.ws.rs</groupId>
            <artifactId>jakarta.ws.rs-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>jakarta.enterprise</groupId>
            <artifactId>jakarta.enterprise.cdi-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>jakarta.inject</groupId>
            <artifactId>jakarta.inject-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>jakarta.enterprise.concurrent</groupId>
            <artifactId>jakarta.enterprise.concurrent-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.jboss.logging</groupId>
            <artifactId>jboss-logging</artifactId>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <finalName>ROOT</finalName>
        <plugins>
            <plugin>
                <groupId>org.wildfly.plugins</groupId>
                <artifactId>wildfly-maven-plugin</artifactId>
                <version>5.1.2.Final</version>
                <executions>
                    <execution>
                        <id>package</id>
                        <goals>
                            <goal>package</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <overwrite-provisioned-server>true</overwrite-provisioned-server>
                    <discover-provisioning-info>
                        <version>35.0.1.Final</version>
                    </discover-provisioning-info>
                </configuration>
            </plugin>
        </plugins>
    </build>

依存関係はひとまずJAX-RSがあればいいのですが、Sseをインジェクションする関係上CDIがあったり、スレッドプールが
必要になるのでJakarta Concurrencyが入ったりしています。

JAX-RSの有効化。

src/main/java/org/littlewings/jaxrs/sse/RestApplication.java

package org.littlewings.jaxrs.sse;

import jakarta.ws.rs.ApplicationPath;
import jakarta.ws.rs.core.Application;

@ApplicationPath("/")
public class RestApplication extends Application {
}

SSEを扱うJAX-RSリソースクラス。

src/main/java/org/littlewings/jaxrs/sse/SseResource.java

package org.littlewings.jaxrs.sse;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import jakarta.enterprise.concurrent.ManagedExecutorService;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseBroadcaster;
import jakarta.ws.rs.sse.SseEventSink;
import org.jboss.logging.Logger;

@Path("/sse")
@ApplicationScoped
public class SseResource {
    private Logger logger = Logger.getLogger(SseResource.class);

    @Inject
    private Sse sse;

    private SseBroadcaster sseBroadcaster;

    @Resource
    private ManagedExecutorService executorService;

    @PostConstruct
    void init() {
        sseBroadcaster = sse.newBroadcaster();
    }

    @GET
    @Path("/one-to-one")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void ontToOne(@Context SseEventSink sseEventSink) {
        executorService.execute(() -> {
            try (SseEventSink sink = sseEventSink) {
                IntStream.rangeClosed(1, 10).forEach(i -> {
                    OutboundSseEvent event =
                            sse.newEventBuilder()
                                    .name(String.format("countup message%d", i))
                                    .data(String.format("message%d", i))
                                    .build();
                    sseEventSink.send(event);
                    sleep();
                });

                sseEventSink.send(sse.newEvent("last message!!"));
            }

            logger.info("send events finish");
        });

        logger.info("resource method end");
    }

    @GET
    @Path("/register")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void register(@Context SseEventSink sseEventSink) {
        sseEventSink.send(sse.newEvent("welcome!!"));
        sseBroadcaster.register(sseEventSink);

        logger.info("client registered");
    }

    @POST
    @Path("/broadcast")
    @Consumes(MediaType.APPLICATION_JSON)
    public void broadcast(Map<String, String> body) {
        sseBroadcaster.broadcast(sse.newEvent(body.get("message")));

        logger.info("send broadcast message");
    }

    private void sleep() {
        try {
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e) {
            // ignore
        }
    }
}

それぞれのポイントを、動作確認とともに見ていきましょう。

SSEを扱うにはSseが必要になるのですが、これはCDI管理Beanとして登録されているので@Injectアノテーション
インジェクションできます。

    @Inject
    private Sse sse;

このSseは、イベントを作成するのに使います。このSseはスレッドセーフです。

Sse (Jakarta RESTful WS API 3.1.0 API)

単一のクライアント向け

単一のクライアント向けのリソースメソッド。

    @GET
    @Path("/one-to-one")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void ontToOne(@Context SseEventSink sseEventSink) {
        String currentThreadName = Thread.currentThread().getName();

        executorService.execute(() -> {
            try (SseEventSink sink = sseEventSink) {
                IntStream.rangeClosed(1, 10).forEach(i -> {
                    OutboundSseEvent event =
                            sse.newEventBuilder()
                                    .name(String.format("countup message%d", i))
                                    .data(String.format("message%d", i))
                                    .build();
                    sseEventSink.send(event);
                    sleep();
                });

                sseEventSink.send(sse.newEvent("last message!!"));
            }

            logger.info("send events finish");
        });

        logger.info("resource method end");
    }

SSEは非同期処理となり、特に単一のクライアント向けのものはリソースメソッドからクライアントにイベントを送信する部分は
スレッドを介して送信することが多いようです。

この部分ですね。

        executorService.execute(() -> {
            try (SseEventSink sink = sseEventSink) {
                IntStream.rangeClosed(1, 10).forEach(i -> {
                    OutboundSseEvent event =
                            sse.newEventBuilder()
                                    .name(String.format("countup message%d", i))
                                    .data(String.format("message%d", i))
                                    .build();
                    sseEventSink.send(event);
                    sleep();
                });

                sseEventSink.send(sse.newEvent("last message!!"));
            }

というかスレッドを介さずに処理を動かすと、単にリソースメソッドのレスポンスを小出しに書き出しているだけの
同期処理になります。

クライアントにイベントを送信するにはSseEventSinkを使う必要があるのですが、これはCDI管理Beanになっていないので
従来どおり@Contextでメソッドの引数として受け取ります。

    public void ontToOne(@Context SseEventSink sseEventSink) {

SseEventSinkはクローズが必要です。またこちらもスレッドセーフです。

SseEventSink (Jakarta RESTful WS API 3.1.0 API)

イベントの組み立てにはSseを使います。

以下のようにOutboundSseEvent.Builderを使って組み立てる方法とSse#newEventで簡易的に組み立てる方法があります。

                    OutboundSseEvent event =
                            sse.newEventBuilder()
                                    .name(String.format("countup message%d", i))
                                    .data(String.format("message%d", i))
                                    .build();
                    sseEventSink.send(event);


                sseEventSink.send(sse.newEvent("last message!!"));

Sse#newEventで指定できるのはdataおよびnameのみですね。

組み立てたイベントを送信するには、SseEventSink#sendを使います。仕様書などを見ても戻り値は無視されていますが、
このメソッドの戻り値はCompletionStageです。この戻り値を扱って後続の処理を行うこともあるでしょうね。

そしてスレッドプールにはJakarta ConcurrencyのManagedExecutorServiceを利用しました。

    @Resource
    private ManagedExecutorService executorService;

Jakarta Concurrency

Developer Guide / EE Concurrency Utilities / Managed Executor Service

これで、リソースメソッドを扱うスレッドがSSEに縛られなくなります。

イベントの送信が非同期になっていることがわかりやすくなるように、スリープとログ出力を入れています。

        logger.info("resource method end");

このあたりもですね。

                    sleep();
                });

                sseEventSink.send(sse.newEvent("last message!!"));

ここで1度動作確認してみましょうか。

$ mvn wildfly:run

curlでアクセスします。

$ curl -i localhost:8080/sse/one-to-one

すると、まずWildFly側にログが出力されます。

18:03:57,706 INFO  [org.littlewings.jaxrs.sse.SseResource] (default task-1) resource method end

curl側には1秒おきにメッセージが表示され、最後のメッセージを受信したら終了します。

HTTP/1.1 200 OK
Connection: keep-alive
Transfer-Encoding: chunked
Content-Type: text/event-stream
Date: Sat, 15 Feb 2025 09:03:57 GMT



event: countup message1
data: message1

event: countup message2
data: message2

event: countup message3
data: message3

event: countup message4
data: message4

event: countup message5
data: message5

event: countup message6
data: message6

event: countup message7
data: message7

event: countup message8
data: message8

event: countup message9
data: message9

event: countup message10
data: message10

data: last message!!

これが終わると、WildFly側にもイベント送信が完了したことを表すログが出力されます。

18:04:07,741 INFO  [org.littlewings.jaxrs.sse.SseResource] (EE-ManagedExecutorService-default-Thread-1) send events finish

リソースメソッド自体はすぐに終了し、イベント送信は別スレッドで行われることが確認できました。

ブロードキャストする

最後はブロードキャストです。

イベントを受け取る側は、こちらのリソースメソッドを呼び出します。

    @GET
    @Path("/register")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void register(@Context SseEventSink sseEventSink) {
        sseEventSink.send(sse.newEvent("welcome!!"));
        sseBroadcaster.register(sseEventSink);

        logger.info("client registered");
    }

SseEventSinkSseBroadcast#registerに登録します。

ここではスレッドプールは使いません。SseBroadcast#registerはすぐにリターンしますし、ここでイベントを送信する処理も
書かないからです。

一方で、イベントを送ることになるリソースメソッドはこちらです。

    @POST
    @Path("/broadcast")
    @Consumes(MediaType.APPLICATION_JSON)
    public void broadcast(Map<String, String> body) {
        sseBroadcaster.broadcast(sse.newEvent(body.get("message")));

        logger.info("send broadcast message");
    }

SseBroadcast#broadcastでイベントを送信することで、SseBroadcast#registerで登録したSseEventSinkに対して
イベントを送信できます。

そしてSseBroadcastSse#newBroadcasterで取得し、こちらもスレッドセーフです。

    @PostConstruct
    void init() {
        sseBroadcaster = sse.newBroadcaster();
    }

SseBroadcaster (Jakarta RESTful WS API 3.1.0 API)

では、動作確認してみます。

WildFlyを起動。

$ mvn wildfly:run

2つのクライアントを登録。

## ひとつ目
$ curl -i localhost:8080/sse/register
HTTP/1.1 200 OK
Connection: keep-alive
Transfer-Encoding: chunked
Content-Type: text/event-stream
Date: Sat, 15 Feb 2025 09:25:03 GMT



data: welcome!!


## 2つ目

$ curl -i localhost:8080/sse/register
HTTP/1.1 200 OK
Connection: keep-alive
Transfer-Encoding: chunked
Content-Type: text/event-stream
Date: Sat, 15 Feb 2025 09:25:06 GMT



data: welcome!!

クライアントはこのままメッセージを待ち続けます。

リソースメソッド側は、この時点で終了しています。

18:25:03,608 INFO  [org.littlewings.jaxrs.sse.SseResource] (default task-1) client registered
18:25:06,793 INFO  [org.littlewings.jaxrs.sse.SseResource] (default task-1) client registered

そして、別のクライアントからメッセージを送信してみます。

$ curl -i -XPOST -H 'Content-Type: application/json' localhost:8080/sse/broadcast -d '{"message": "Hello World"}'
HTTP/1.1 204 No Content
Date: Sat, 15 Feb 2025 09:25:47 GMT


$ curl -i -XPOST -H 'Content-Type: application/json' localhost:8080/sse/broadcast -d '{"message": "Hello WildFly"}'
HTTP/1.1 204 No Content
Date: Sat, 15 Feb 2025 09:26:27 GMT

それぞれのクライアントにメッセージが届きました。

## ひとつ目
data: Hello World

data: Hello WildFly


## 2つ目
data: Hello World

data: Hello WildFly

WildFly側のログ。

18:25:47,005 INFO  [org.littlewings.jaxrs.sse.SseResource] (default task-1) send broadcast message
18:26:27,595 INFO  [org.littlewings.jaxrs.sse.SseResource] (default task-1) send broadcast message

これで確認できましたね。

少しRESTEasyの実装を見てみる

RESTEasyの実装がどうなっているのか、少し見てみましょう。

SSEに関する実装は、こちらのパッケージにあるようです。

https://github.com/resteasy/resteasy/tree/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse

SseEventSinkの実装はこちら。

https://github.com/resteasy/resteasy/blob/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java

Jakarta Servletの非同期処理の仕組みを使っているようです。

https://github.com/resteasy/resteasy/blob/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java#L86-L87

Jakarta Servlet Specification / The Servlet Interface / Servlet Life Cycle / Asynchronous processing

SseEventSinkを作成しているのは、ContainerRequestFilterですね。

https://github.com/resteasy/resteasy/blob/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventSinkInterceptor.java#L29

SSEかどうかの判定ですが

https://github.com/resteasy/resteasy/blob/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventSinkInterceptor.java#L25

こちらで行っています。

https://github.com/resteasy/resteasy/blob/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/core/ResourceMethodInvoker.java#L199-L239

メディアタイプがSSEであり、メソッドの引数にSseEventSinkがある場合にSSEと見なしています。またSSEをサポートする
リアクティブなアノテーション@Stream)が付与されている場合もSSEと見なすようです。

あとは非同期のOutputStreamに対してレスポンスを書き込みます。

https://github.com/resteasy/resteasy/blob/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java#L146-L158

非同期として扱うのは、ここからのようですね。

https://github.com/resteasy/resteasy/blob/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/server/servlet/Servlet3AsyncHttpRequest.java#L343

Jakarta ServletServletRequst#startAsyncを呼び出しているので、呼び出し側でスレッドを操作する必要があります。
このため今回はJakarta Concurrencyを使ったという感じですね。

HttpServletRequestHttpServletResponseもそのままのものを使います。

ブロードキャストの方を見てみましょう。

SseBroadcaster#registerは、キューに登録して完了のようです。

https://github.com/resteasy/resteasy/blob/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseBroadcasterImpl.java#L122-L132

ではSseBroadcaster#broadcastはどうかというと、キューに登録されたSseEventSinkに対してイベントを送信します。

https://github.com/resteasy/resteasy/blob/6.2.11.Final/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseBroadcasterImpl.java#L134-L159

この時にCompletableFutureを使ってはいますが、CompletableFutureは構築済みの値を使って開始しているので
実質同期処理みたいですね。

これでおよそどういう動きをしているかは読めたかなと思います。

おわりに

WildFly 35j(というかRESTEasy)でServer-Sent Eventsを試してみました。

使い方自体はスレッドプールの扱いに気をつければそれほど難しくない気はしますが、裏の仕組みもちょっと追ってみたので
なかなか面白かったですね。

せっかくなので覚えておきましょう。