これは、なにをしたくて書いたもの?
Jakarta RESTful Web Services(以降JAX-RS)ではServer-Sent Events(SSE)が扱えるようなのですが、そういえば見かけたことは
あるものの試したことがなかったので簡単に使ってみることにしました。
WebSocketは扱っていたんですけどね。
UndertowでWebSocketを使って遊ぶ - CLOVER🍀
Quarkus × WebSocketで遊ぶ - CLOVER🍀
Server-Sent Events(SSE)
そもそもServer-Sent Eventsとは?というところからですが、これはサーバーがクライアントにメッセージをプッシュする
仕組みのことです。
ここで扱うメッセージはイベント、データと呼ばれるようです。
WHATWGによる仕様書はこちらです。
ServerとSentの間に「-」が入るんですね。
Jakarta RESTful Web ServicesでのServer-Sent Eventsへの対応
Jakarta RESTful Web Services(JAX-RS)では、バージョン2.1からServer-Sent Eventsに対応しているようです。
Jakarta RESTful Web Services / Server-Sent Events
サーバーとクライアントの両方でサポートされています。ブロードキャストもできるようです。
最初のSSEレスポンス以外は少し異なるとされているものの、JAX-RSのパイプラインを通してSSEイベントは送信される
ようです。
The initial SSE response, which may only include the HTTP headers, is processed using the standard JAX-RS pipeline as described in Appendix Processing Pipeline. Each subsequent SSE event may include a different payload and thus require the use of a specific message body writer. Note that since this use case differs slightly from the normal JAX-RS pipeline, implementations SHOULD NOT call entity interceptors on each individual event.


Jakarta RESTful Web Services / Processing Pipeline
JAX-RSのSSEに関するパッケージはこちら。
jakarta.ws.rs.sse (Jakarta RESTful WS API 3.1.0 API)
サーバー側で主に使うAPIはこちら。
- Sse (Jakarta RESTful WS API 3.1.0 API)
- SseEventSink (Jakarta RESTful WS API 3.1.0 API)
- https://jakarta.ee/specifications/restful-ws/3.1/apidocs/jakarta.ws.rs/jakarta/ws/rs/sse/outboundsseevent
このうち、SseはJakarta Contexts and Dependency Injection(CDI)を使ったインジェクションが可能です。
SseEventSinkについては@Contextで扱う必要があります。
JAX-RS(Jakarta RESTful Web Services) 3.1.0で、Contextアノテーションの代わりにCDIが推奨されるようになっていたという話 - CLOVER🍀
OutboundSseEventおよびSseEventがSSEにおける「イベント」に相当し、各プロパティの詳しい意味はWHATWGの
仕様書を見た方がよいでしょうね。
SSEを扱う時は、基本的にスレッドプールとセットで利用することになると思います。仕様書内にもExecutorServiceの
影が見えますね。
クライアント側で主に使うAPIはこちら。
- SseEventSource (Jakarta RESTful WS API 3.1.0 API)
- InboundSseEvent (Jakarta RESTful WS API 3.1.0 API)
JAX-RSの実装であるRESTEasyおよびJerseyのSSEに関するドキュメントはこちら。
- RESTEasy
- Jersey
RESTEasyに関しては、リアクティブなAPIを使うことで自然な形でSSEに対応できたりします。
今回は、WildFly 35とRESTEasyでSSEを試してみようと思います。
環境
今回の環境はこちら。
$ java --version openjdk 21.0.6 2025-01-21 OpenJDK Runtime Environment (build 21.0.6+7-Ubuntu-124.04.1) OpenJDK 64-Bit Server VM (build 21.0.6+7-Ubuntu-124.04.1, mixed mode, sharing) $ mvn --version Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 21.0.6, vendor: Ubuntu, runtime: /usr/lib/jvm/java-21-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "6.8.0-53-generic", arch: "amd64", family: "unix"
お題
こんな感じでクライアントはcurl、サーバーはWildFlyとして、単一のクライアントに対してSSEを行うパターンと複数の
クライアントに対してブロードキャストするパターンを試してみたいと思います。
flowchart LR
A[curl] --> |HTTP/single| W[WildFLy]
subgraph "Broadcast Group"
B[curl] --> |HTTP/broadcast| W_dummy
C[curl] --> |HTTP/broadcast| W_dummy
D[curl] --> |HTTP/register| W_dummy
end
W_dummy(( )) --> W
ブロードキャストするパターンは2つのクライアントがメッセージの受信用で、あとひとつはメッセージの送信用です。
送信されたメッセージを、2つのクライアントがSSEを使って受信します。
JAX-RSでSSEを扱う
では、SSEを扱うアプリケーションを作成します。
Maven依存関係など。packagingはwarです。
<properties> <maven.compiler.release>21</maven.compiler.release> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.wildfly.bom</groupId> <artifactId>wildfly-ee-with-tools</artifactId> <version>35.0.1.Final</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>jakarta.servlet</groupId> <artifactId>jakarta.servlet-api</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>jakarta.ws.rs</groupId> <artifactId>jakarta.ws.rs-api</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>jakarta.enterprise</groupId> <artifactId>jakarta.enterprise.cdi-api</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>jakarta.inject</groupId> <artifactId>jakarta.inject-api</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>jakarta.enterprise.concurrent</groupId> <artifactId>jakarta.enterprise.concurrent-api</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.jboss.logging</groupId> <artifactId>jboss-logging</artifactId> <scope>provided</scope> </dependency> </dependencies> <build> <finalName>ROOT</finalName> <plugins> <plugin> <groupId>org.wildfly.plugins</groupId> <artifactId>wildfly-maven-plugin</artifactId> <version>5.1.2.Final</version> <executions> <execution> <id>package</id> <goals> <goal>package</goal> </goals> </execution> </executions> <configuration> <overwrite-provisioned-server>true</overwrite-provisioned-server> <discover-provisioning-info> <version>35.0.1.Final</version> </discover-provisioning-info> </configuration> </plugin> </plugins> </build>
依存関係はひとまずJAX-RSがあればいいのですが、Sseをインジェクションする関係上CDIがあったり、スレッドプールが
必要になるのでJakarta Concurrencyが入ったりしています。
JAX-RSの有効化。
src/main/java/org/littlewings/jaxrs/sse/RestApplication.java
package org.littlewings.jaxrs.sse; import jakarta.ws.rs.ApplicationPath; import jakarta.ws.rs.core.Application; @ApplicationPath("/") public class RestApplication extends Application { }
SSEを扱うJAX-RSリソースクラス。
src/main/java/org/littlewings/jaxrs/sse/SseResource.java
package org.littlewings.jaxrs.sse; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import jakarta.enterprise.concurrent.ManagedExecutorService; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.ws.rs.Consumes; import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.sse.OutboundSseEvent; import jakarta.ws.rs.sse.Sse; import jakarta.ws.rs.sse.SseBroadcaster; import jakarta.ws.rs.sse.SseEventSink; import org.jboss.logging.Logger; @Path("/sse") @ApplicationScoped public class SseResource { private Logger logger = Logger.getLogger(SseResource.class); @Inject private Sse sse; private SseBroadcaster sseBroadcaster; @Resource private ManagedExecutorService executorService; @PostConstruct void init() { sseBroadcaster = sse.newBroadcaster(); } @GET @Path("/one-to-one") @Produces(MediaType.SERVER_SENT_EVENTS) public void ontToOne(@Context SseEventSink sseEventSink) { executorService.execute(() -> { try (SseEventSink sink = sseEventSink) { IntStream.rangeClosed(1, 10).forEach(i -> { OutboundSseEvent event = sse.newEventBuilder() .name(String.format("countup message%d", i)) .data(String.format("message%d", i)) .build(); sseEventSink.send(event); sleep(); }); sseEventSink.send(sse.newEvent("last message!!")); } logger.info("send events finish"); }); logger.info("resource method end"); } @GET @Path("/register") @Produces(MediaType.SERVER_SENT_EVENTS) public void register(@Context SseEventSink sseEventSink) { sseEventSink.send(sse.newEvent("welcome!!")); sseBroadcaster.register(sseEventSink); logger.info("client registered"); } @POST @Path("/broadcast") @Consumes(MediaType.APPLICATION_JSON) public void broadcast(Map<String, String> body) { sseBroadcaster.broadcast(sse.newEvent(body.get("message"))); logger.info("send broadcast message"); } private void sleep() { try { TimeUnit.SECONDS.sleep(1L); } catch (InterruptedException e) { // ignore } } }
それぞれのポイントを、動作確認とともに見ていきましょう。
SSEを扱うにはSseが必要になるのですが、これはCDI管理Beanとして登録されているので@Injectアノテーションで
インジェクションできます。
@Inject private Sse sse;
このSseは、イベントを作成するのに使います。このSseはスレッドセーフです。
Sse (Jakarta RESTful WS API 3.1.0 API)
単一のクライアント向け
単一のクライアント向けのリソースメソッド。
@GET @Path("/one-to-one") @Produces(MediaType.SERVER_SENT_EVENTS) public void ontToOne(@Context SseEventSink sseEventSink) { String currentThreadName = Thread.currentThread().getName(); executorService.execute(() -> { try (SseEventSink sink = sseEventSink) { IntStream.rangeClosed(1, 10).forEach(i -> { OutboundSseEvent event = sse.newEventBuilder() .name(String.format("countup message%d", i)) .data(String.format("message%d", i)) .build(); sseEventSink.send(event); sleep(); }); sseEventSink.send(sse.newEvent("last message!!")); } logger.info("send events finish"); }); logger.info("resource method end"); }
SSEは非同期処理となり、特に単一のクライアント向けのものはリソースメソッドからクライアントにイベントを送信する部分は
スレッドを介して送信することが多いようです。
この部分ですね。
executorService.execute(() -> {
try (SseEventSink sink = sseEventSink) {
IntStream.rangeClosed(1, 10).forEach(i -> {
OutboundSseEvent event =
sse.newEventBuilder()
.name(String.format("countup message%d", i))
.data(String.format("message%d", i))
.build();
sseEventSink.send(event);
sleep();
});
sseEventSink.send(sse.newEvent("last message!!"));
}
というかスレッドを介さずに処理を動かすと、単にリソースメソッドのレスポンスを小出しに書き出しているだけの
同期処理になります。
クライアントにイベントを送信するにはSseEventSinkを使う必要があるのですが、これはCDI管理Beanになっていないので
従来どおり@Contextでメソッドの引数として受け取ります。
public void ontToOne(@Context SseEventSink sseEventSink) {
SseEventSinkはクローズが必要です。またこちらもスレッドセーフです。
SseEventSink (Jakarta RESTful WS API 3.1.0 API)
イベントの組み立てにはSseを使います。
以下のようにOutboundSseEvent.Builderを使って組み立てる方法とSse#newEventで簡易的に組み立てる方法があります。
OutboundSseEvent event =
sse.newEventBuilder()
.name(String.format("countup message%d", i))
.data(String.format("message%d", i))
.build();
sseEventSink.send(event);
sseEventSink.send(sse.newEvent("last message!!"));
Sse#newEventで指定できるのはdataおよびnameのみですね。
組み立てたイベントを送信するには、SseEventSink#sendを使います。仕様書などを見ても戻り値は無視されていますが、
このメソッドの戻り値はCompletionStageです。この戻り値を扱って後続の処理を行うこともあるでしょうね。
そしてスレッドプールにはJakarta ConcurrencyのManagedExecutorServiceを利用しました。
@Resource private ManagedExecutorService executorService;
Developer Guide / EE Concurrency Utilities / Managed Executor Service
これで、リソースメソッドを扱うスレッドがSSEに縛られなくなります。
イベントの送信が非同期になっていることがわかりやすくなるように、スリープとログ出力を入れています。
logger.info("resource method end");
このあたりもですね。
sleep();
});
sseEventSink.send(sse.newEvent("last message!!"));
ここで1度動作確認してみましょうか。
$ mvn wildfly:run
curlでアクセスします。
$ curl -i localhost:8080/sse/one-to-one
すると、まずWildFly側にログが出力されます。
18:03:57,706 INFO [org.littlewings.jaxrs.sse.SseResource] (default task-1) resource method end
curl側には1秒おきにメッセージが表示され、最後のメッセージを受信したら終了します。
HTTP/1.1 200 OK Connection: keep-alive Transfer-Encoding: chunked Content-Type: text/event-stream Date: Sat, 15 Feb 2025 09:03:57 GMT event: countup message1 data: message1 event: countup message2 data: message2 event: countup message3 data: message3 event: countup message4 data: message4 event: countup message5 data: message5 event: countup message6 data: message6 event: countup message7 data: message7 event: countup message8 data: message8 event: countup message9 data: message9 event: countup message10 data: message10 data: last message!!
これが終わると、WildFly側にもイベント送信が完了したことを表すログが出力されます。
18:04:07,741 INFO [org.littlewings.jaxrs.sse.SseResource] (EE-ManagedExecutorService-default-Thread-1) send events finish
リソースメソッド自体はすぐに終了し、イベント送信は別スレッドで行われることが確認できました。
ブロードキャストする
最後はブロードキャストです。
イベントを受け取る側は、こちらのリソースメソッドを呼び出します。
@GET @Path("/register") @Produces(MediaType.SERVER_SENT_EVENTS) public void register(@Context SseEventSink sseEventSink) { sseEventSink.send(sse.newEvent("welcome!!")); sseBroadcaster.register(sseEventSink); logger.info("client registered"); }
SseEventSinkをSseBroadcast#registerに登録します。
ここではスレッドプールは使いません。SseBroadcast#registerはすぐにリターンしますし、ここでイベントを送信する処理も
書かないからです。
一方で、イベントを送ることになるリソースメソッドはこちらです。
@POST @Path("/broadcast") @Consumes(MediaType.APPLICATION_JSON) public void broadcast(Map<String, String> body) { sseBroadcaster.broadcast(sse.newEvent(body.get("message"))); logger.info("send broadcast message"); }
SseBroadcast#broadcastでイベントを送信することで、SseBroadcast#registerで登録したSseEventSinkに対して
イベントを送信できます。
そしてSseBroadcastはSse#newBroadcasterで取得し、こちらもスレッドセーフです。
@PostConstruct void init() { sseBroadcaster = sse.newBroadcaster(); }
SseBroadcaster (Jakarta RESTful WS API 3.1.0 API)
では、動作確認してみます。
WildFlyを起動。
$ mvn wildfly:run
2つのクライアントを登録。
## ひとつ目 $ curl -i localhost:8080/sse/register HTTP/1.1 200 OK Connection: keep-alive Transfer-Encoding: chunked Content-Type: text/event-stream Date: Sat, 15 Feb 2025 09:25:03 GMT data: welcome!! ## 2つ目 $ curl -i localhost:8080/sse/register HTTP/1.1 200 OK Connection: keep-alive Transfer-Encoding: chunked Content-Type: text/event-stream Date: Sat, 15 Feb 2025 09:25:06 GMT data: welcome!!
クライアントはこのままメッセージを待ち続けます。
リソースメソッド側は、この時点で終了しています。
18:25:03,608 INFO [org.littlewings.jaxrs.sse.SseResource] (default task-1) client registered 18:25:06,793 INFO [org.littlewings.jaxrs.sse.SseResource] (default task-1) client registered
そして、別のクライアントからメッセージを送信してみます。
$ curl -i -XPOST -H 'Content-Type: application/json' localhost:8080/sse/broadcast -d '{"message": "Hello World"}'
HTTP/1.1 204 No Content
Date: Sat, 15 Feb 2025 09:25:47 GMT
$ curl -i -XPOST -H 'Content-Type: application/json' localhost:8080/sse/broadcast -d '{"message": "Hello WildFly"}'
HTTP/1.1 204 No Content
Date: Sat, 15 Feb 2025 09:26:27 GMT
それぞれのクライアントにメッセージが届きました。
## ひとつ目 data: Hello World data: Hello WildFly ## 2つ目 data: Hello World data: Hello WildFly
WildFly側のログ。
18:25:47,005 INFO [org.littlewings.jaxrs.sse.SseResource] (default task-1) send broadcast message 18:26:27,595 INFO [org.littlewings.jaxrs.sse.SseResource] (default task-1) send broadcast message
これで確認できましたね。
少しRESTEasyの実装を見てみる
RESTEasyの実装がどうなっているのか、少し見てみましょう。
SSEに関する実装は、こちらのパッケージにあるようです。
SseEventSinkの実装はこちら。
Jakarta Servletの非同期処理の仕組みを使っているようです。
Jakarta Servlet Specification / The Servlet Interface / Servlet Life Cycle / Asynchronous processing
SseEventSinkを作成しているのは、ContainerRequestFilterですね。
SSEかどうかの判定ですが
こちらで行っています。
メディアタイプがSSEであり、メソッドの引数にSseEventSinkがある場合にSSEと見なしています。またSSEをサポートする
リアクティブなアノテーション(@Stream)が付与されている場合もSSEと見なすようです。
あとは非同期のOutputStreamに対してレスポンスを書き込みます。
非同期として扱うのは、ここからのようですね。
Jakarta ServletのServletRequst#startAsyncを呼び出しているので、呼び出し側でスレッドを操作する必要があります。
このため今回はJakarta Concurrencyを使ったという感じですね。
HttpServletRequestとHttpServletResponseもそのままのものを使います。
ブロードキャストの方を見てみましょう。
SseBroadcaster#registerは、キューに登録して完了のようです。
ではSseBroadcaster#broadcastはどうかというと、キューに登録されたSseEventSinkに対してイベントを送信します。
この時にCompletableFutureを使ってはいますが、CompletableFutureは構築済みの値を使って開始しているので
実質同期処理みたいですね。
これでおよそどういう動きをしているかは読めたかなと思います。
おわりに
WildFly 35j(というかRESTEasy)でServer-Sent Eventsを試してみました。
使い方自体はスレッドプールの扱いに気をつければそれほど難しくない気はしますが、裏の仕組みもちょっと追ってみたので
なかなか面白かったですね。
せっかくなので覚えておきましょう。