CLOVER🍀

That was when it all began.

Payara MicroのClustered CDI Event Bus(Hazelcast Topic)の仕組み

最近、とある人から「PayaraのCDI Event Busの仕組みを知りたいんですけど、どうやってるんでしょうか?」と聞かれました。

言葉自体初めて聞くのに、なにそれ??みたいな感じでしたが。

まあ、Payaraのブログの絵を見せてもらって、「あ、これって…」と思うところがあったので調べてみることにしました。

Clustered CDI Event Busとは?

Payaraのブログで紹介されているのは、こちら。

Microservice choreography with the Payara Micro clustered CDI Event Bus

正確に言うと、Payara Microの機能みたいですね。

Microservicesの文脈で紹介されていますが、CDIのEvent APIベースの機能で、あるPayara Microのインスタンスで発生したCDI Eventを、別のPayara Microのインスタンスに伝播させるもののようです。

※ブログの画像より

サンプルとしては、GitHubにこちらが置かれています。

Payara-Examples/Payara-Micro/cdi-clustered-events at master · payara/Payara-Examples · GitHub

で、実装を見てみると、どうやらHazelcastのTopicを使って実現しているようです。

該当するパッケージは、このあたりです。
https://github.com/payara/Payara/tree/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus
https://github.com/payara/Payara/tree/payara-server-4.1.1.162/appserver/payara-appserver-modules/payara-micro-cdi/src/main/java/fish/payara/micro/cdi

実際にHazelcastのTopic&MessageListenerを使っているのは、こちら。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/EventBus.java#L69
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/TopicListener.java#L28

と済ませるのはアレなので、もう少し追ってみましょう。

HazelcastのTopicとは

Hazelcastが提供するデータ構造のひとつで、メッセージのPublish/Subscribeのモデルを提供するものです。

Topic

Subscriber側は各Nodeで、かつ複数紐づけることができます。

簡単な例を書いてみましょう。

まずは、Topicにメッセージを受け取るためのListener、MessageListenerインターフェースを実装して浮いていてもらうクラスを用意。
HazelcastTopicReceiver.java

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;

public class HazelcastTopicReceiver {
    public static void main(String... args) {
        HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
        ITopic<String> topic = hazelcast.getTopic("default");

        topic.addMessageListener(new MyListener());

        System.console().readLine("> Enter stop.");

        hazelcast.getLifecycleService().shutdown();
        Hazelcast.shutdownAll();
    }

    static class MyListener implements MessageListener<String> {
        @Override
        public void onMessage(Message<String> message) {
            System.out.printf("received message = %s%n", message.getMessageObject());
        }
    }
}

このプログラムは起動するとコンソールで待ち続けますが、Enterを打つと終了します。

メッセージを受け取るには、MessageListenerインターフェースを実装したクラスを用意し、ITopic#addMessageListenerすればOKです。

なお、MessageListenerはSerializableである必要はありません。Local Nodeのみで動作するからです。

そして、メッセージ送信側。
HazelcastTopicSender.java

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;

public class HazelcastTopicSender {
    public static void main(String... args) {
        HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
        ITopic<String> topic = hazelcast.getTopic("default");

        topic.publish(args[0]);

        hazelcast.getLifecycleService().shutdown();
        Hazelcast.shutdownAll();
    }
}

ITopic#publishすることで、メッセージを送信することができます。ITopic#publishで送り出すメッセージは、Serializableである必要があります。

よって、Payara MicroのClustered CDI Event BusでEvent#fireに渡す引数も、やはりSerializableである必要があります。

動作させてみましょう。

まず、受信側のNodeを2つ起動してみます。
※Hazelcastのバージョンは、Payaraに合わせました

## Node 1
$ java -cp hazelcast-3.6.2.jar:. HazelcastTopicReceiver

## Node 2
$ java -cp hazelcast-3.6.2.jar:. HazelcastTopicReceiver

クラスタが構成されます。

Members [2] {
	Member [172.17.0.1]:5701 this
	Member [172.17.0.1]:5702
}

では、メッセージを送ってみましょう。

$ java -cp hazelcast-3.6.2.jar:. HazelcastTopicSender "Hello Cluster"

すると、待機していた2つのNode両方のコンソールに、送ったメッセージが表示されます。

## Node 1
received message = Hello Cluster

## Node 2
received message = Hello Cluster

このように、Topicを介して各Nodeにメッセージを送信、Listenerで受信できる仕組みとなります。

今回はメッセージの送受信でクラスを分けましたが、ひとつのNodeでメッセージの送信、受信を行うプログラムを書いてもなんら問題はありません。

では、続けてPayara MicroでのClustered CDI Event Busを見ていってみましょう。

Payara MicroのClustered CDI Event Busを使う

それでは、GitHubのサンプルリポジトリを参考に、Payara MicroのClustered CDI Event Busを使うプログラムを書いてみましょう。

準備

Payara MicroのJARファイルはダウンロード済みとします。今回使うバージョンは、「4.1.1.162」です。

Maven依存関係は、以下のとおり。

        <dependency>
            <groupId>fish.payara.extras</groupId>
            <artifactId>payara-micro</artifactId>
            <version>4.1.1.162</version>
            <scope>provided</scope>
        </dependency>

できあがるWARファイルの名前は、「app.war」とします。

メッセージ用のクラスの作成

CDI Event APIで、送受信に使用するクラスを定義します。

単にStringのメッセージを持つだけですが、追加としてPayara MicroのインスタンスのUUIDも設定できるようにしています。これは、サンプルのマネですが。
src/main/java/org/littlewings/payara/cdi/EventMessage.java

package org.littlewings.payara.cdi;

import java.io.Serializable;

public class EventMessage implements Serializable {
    private static final long serialVersionUID = 1L;

    private String value;

    private String memberId;

    public EventMessage(String value, String memberId) {
        this.value = value;
        this.memberId = memberId;
    }

    public String getValue() {
        return value;
    }

    public String getMemberId() {
        return memberId;
    }
}
メッセージの送信側

メッセージの送信側は、JAX-RSでコードを作成することにします。

JAX-RSの有効化。
src/main/java/org/littlewings/payara/rest/JarsApplication.java

package org.littlewings.payara.rest;

import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

@ApplicationPath("rest")
public class JarsApplication extends Application {
}

QueryStringで受け取ったメッセージを、CDIのイベントに送るクラス。
src/main/java/org/littlewings/payara/rest/EventSendResource.java

package org.littlewings.payara.rest;

import javax.enterprise.context.RequestScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import fish.payara.micro.PayaraMicroRuntime;
import fish.payara.micro.cdi.Outbound;
import org.littlewings.payara.cdi.EventMessage;

@Path("event")
@RequestScoped
public class EventSendResource {
    @Inject
    @Outbound
    private Event<EventMessage> event;

    @Inject
    private PayaraMicroRuntime runtime;

    @GET
    @Path("send")
    @Produces(MediaType.TEXT_PLAIN)
    public String send(@QueryParam("message") String message) {
        event.fire(new EventMessage(message, runtime.getLocalDescriptor().getMemberUUID()));

        return "OK!";
    }
}

特になんの変哲もない、JAX-RSCDI Event APIを使ったクラスですね?

…と言いたいところですが、よーく見るとEventの@Injectする時に、@Outboundアノテーションが付いています。

    @Inject
    @Outbound
    private Event<EventMessage> event;

Clustered CDI Event Busを使う時は、このアノテーションが必要です。

メッセージの受信側

続いて、メッセージの受信側を作成します。
src/main/java/org/littlewings/payara/cdi/EventMessageReceiver.java

package org.littlewings.payara.cdi;

import javax.enterprise.context.Dependent;
import javax.enterprise.event.Observes;

import fish.payara.micro.cdi.Inbound;

@Dependent
public class EventMessageReceiver {
    public void observe(@Observes @Inbound EventMessage event) {
        System.out.printf("received message = %s, member = %s%n", event.getValue(), event.getMemberId());
    }
}

こちらも普通のCDIの@Observesを利用したもの、と思いきや、@Inboundアノテーションが付いています。

    public void observe(@Observes @Inbound EventMessage event) {

@Outboundも@Inboundも、実は@Qualifierです

ServletContextListenerを作る

用意はこれだけで終わりではなくて、Clustered CDI Event Busの初期化が必要です。

追記
Payara 171以降では、初期化処理は不要になりました。
Payara MicroのClustered CDI Event Busが初期化処理が不要になり、loopbackができるようになったという話 - CLOVER

今回は、ServletContextListenerを使って行うことにしました。
src/main/java/org/littlewings/payara/servlet/ClusteredEventBusInitializeListener.java

package org.littlewings.payara.servlet;

import javax.inject.Inject;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;

import fish.payara.micro.cdi.ClusteredCDIEventBus;

@WebListener
public class ClusteredEventBusInitializeListener implements ServletContextListener {
    @Inject
    private ClusteredCDIEventBus clusteredEventBus;

    @Override
    public void contextInitialized(ServletContextEvent servletContextEvent) {
        clusteredEventBus.initialize();
    }

    @Override
    public void contextDestroyed(ServletContextEvent servletContextEvent) {
        // no-op
    }
}

ClusteredCDIEventBusを@Injectしてinitializeしていますが

    @Inject
    private ClusteredCDIEventBus clusteredEventBus;

実はこのinitializeを呼ぶのはどうでもよくて(ログ出力のみなので)、実際にはClusteredCDIEventBusの@PostConstructを呼び出すのが目的のようです。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/appserver/payara-appserver-modules/payara-micro-cdi/src/main/java/fish/payara/micro/cdi/ClusteredCDIEventBus.java#L81-L85

この処理は、Servletの起動時には行われている必要があるようで、サンプルでも
https://github.com/payara/Payara-Examples/blob/master/Payara-Micro/cdi-clustered-events/event-sender/src/main/java/fish/payara/examples/payaramicro/event/sender/SendEventServlet.java#L124-L131

    @Override
    public void init() throws ServletException {
        super.init(); 
        
        // you must intitialize the Event Bus in your servlet for events to flow.
        bus.initialize();

}

みたいに書かれていて、Servlet#initで初期化せよ、と。

しかも、loadOnStartupも1ですし。
https://github.com/payara/Payara-Examples/blob/master/Payara-Micro/cdi-clustered-events/event-sender/src/main/java/fish/payara/examples/payaramicro/event/sender/SendEventServlet.java#L38-L39

@WebServlet(name = "SendEventServlet", urlPatterns = {"/SendEventServlet"}, loadOnStartup = 1)
public class SendEventServlet extends HttpServlet {

最初、これに気付かなくてJAX-RSのリソースクラスで初期化してたら、うまく動かなくてハマりました…。

CDIを使っていると最終的には有効になるようですが、CDI Beanの初期化のタイミングで、実際にメッセージを受け取れるタイミングがバラバラになったりするので、起動時に初期化しておいた方がよいです。

これで、準備は完了となります。

動かしてみる

それでは、作成したプログラムを動かしてみます。

まずはパッケージング。

$ mvn package

app.warというファイルができます。

ここで注意点ですが、Payara MicroのClustered CDI Event Busを使用するには、最低でもPayara Microのインスタンスが2 Node以上必要になります。

これは、イベントを発火させたNode自体は、イベントを受け取らないためです。なので、1 Nodeだと何も起こらないように見えてしまいます…。

このあたりは、あとで説明します。

では、Payara MicroでWARをデプロイして確認してみましょう。今回は3 Node起動してみます。

## Node 1
$ java -jar payara-micro-4.1.1.162.jar --deploy target/app.war --autoBindHttp

## Node 2
$ java -jar payara-micro-4.1.1.162.jar --deploy target/app.war --autoBindHttp

## Node 3
$ java -jar payara-micro-4.1.1.162.jar --deploy target/app.war --autoBindHttp

ポートの割り振りは、--autoBindHttpで行いました。

これでポート8080、8081、8082でリッスンするPayara Microのインスタンスが3つ立ち上がります。

クラスタも構成されると。

Members [3] {
	Member [172.17.0.1]:5900 this
	Member [172.17.0.1]:5901
	Member [172.17.0.1]:5902
}

では、curlでNode 1にリクエストを投げてみます。

$ curl http://localhost:8080/app/rest/event/send?message=HelloWorld-to-node1
OK!

結果、コンソールへは以下のような出力になります。

## Node 1
# なにもなし

## Node 2
received message = HelloWorld-to-node1, member = ea64035d-a4a8-46c2-a57e-d949783f637b

## Node 3
received message = HelloWorld-to-node1, member = ea64035d-a4a8-46c2-a57e-d949783f637b

今度は、Node 3に送ってみましょう。

$ curl http://localhost:8082/app/rest/event/send?message=HelloWorld-to-node3
OK!

結果。

## Node 1
received message = HelloWorld-to-node3, member = a5b94764-8cab-4249-8350-62fbffdc7f25

## Node 2
received message = HelloWorld-to-node3, member = a5b94764-8cab-4249-8350-62fbffdc7f25

## Node 3
# なにもなし

というわけで、メッセージを受けたNode以外がイベント通知を受信します。

とりあえず、動いてそうですね?

で、どうなってるのか?

ここで、少しトレースするために、イベントを受け取るクラスでスタックトレースを出力するようにしてみましょう。

@Dependent
public class EventMessageReceiver {
    public void observe(@Observes @Inbound EventMessage event) {
        System.out.printf("received message = %s, member = %s%n", event.getValue(), event.getMemberId());

        Thread.dumpStack();
    }
}

これで再度確認すると、こんなスタックトレースが得られます。

java.lang.Exception: Stack trace
	at java.lang.Thread.dumpStack(Thread.java:1329)
	at org.littlewings.payara.cdi.EventMessageReceiver.observe(EventMessageReceiver.java:13)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.jboss.weld.injection.StaticMethodInjectionPoint.invoke(StaticMethodInjectionPoint.java:88)
	at org.jboss.weld.injection.StaticMethodInjectionPoint.invoke(StaticMethodInjectionPoint.java:78)
	at org.jboss.weld.injection.MethodInvocationStrategy$SimpleMethodInvocationStrategy.invoke(MethodInvocationStrategy.java:129)
	at org.jboss.weld.event.ObserverMethodImpl.sendEvent(ObserverMethodImpl.java:309)
	at org.jboss.weld.event.ObserverMethodImpl.sendEvent(ObserverMethodImpl.java:287)
	at org.jboss.weld.event.ObserverMethodImpl.notify(ObserverMethodImpl.java:265)
	at org.jboss.weld.event.ObserverNotifier.notifySyncObservers(ObserverNotifier.java:271)
	at org.jboss.weld.event.ObserverNotifier.notify(ObserverNotifier.java:260)
	at org.jboss.weld.event.EventImpl.fire(EventImpl.java:89)
	at fish.payara.micro.cdi.ClusteredCDIEventBus.eventReceived(ClusteredCDIEventBus.java:60)
	at fish.payara.micro.services.PayaraMicroInstance.receiveMessage(PayaraMicroInstance.java:146)
	at fish.payara.nucleus.eventbus.TopicListener.onMessage(TopicListener.java:56)
	at com.hazelcast.topic.impl.TopicService.dispatchEvent(TopicService.java:131)
	at com.hazelcast.spi.impl.eventservice.impl.EventProcessor.process(EventProcessor.java:48)
	at com.hazelcast.spi.impl.eventservice.impl.RemoteEventProcessor.run(RemoteEventProcessor.java:36)
	at com.hazelcast.util.executor.StripedExecutor$Worker.process(StripedExecutor.java:187)
	at com.hazelcast.util.executor.StripedExecutor$Worker.run(StripedExecutor.java:171)

バッチリHazelcastのTopicが使われていますね。

Payara MicroでHazelcastを使えるようにすると、TopicにMessageListenerが登録されます。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/EventBus.java#L69

送信するメッセージは、ClusterMessageとしてラップしてTopicに登録されます、
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/EventBus.java#L56
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/ClusterMessage.java

TopicのMessageListenerは、ClusterMessageの形式で受け取ります。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/TopicListener.java#L28

あとはスタックトレースが指す通り、Local NodeのCDI Eventが発行され、CDIのイベント受信側のクラスが起動するという仕掛けです。

	at org.jboss.weld.event.EventImpl.fire(EventImpl.java:89)
	at fish.payara.micro.cdi.ClusteredCDIEventBus.eventReceived(ClusteredCDIEventBus.java:60)
	at fish.payara.micro.services.PayaraMicroInstance.receiveMessage(PayaraMicroInstance.java:146)
	at fish.payara.nucleus.eventbus.TopicListener.onMessage(TopicListener.java:56)

でも、この理屈だと全Nodeがイベントを受け取ることになるのですが、イベントが発生したNode自体は通知を受けていなかったと思います。

追記
Payara 171以降では、イベントが発生したNodeでもイベントを受け取れるようになりました。
Payara MicroのClustered CDI Event Busが初期化処理が不要になり、loopbackができるようになったという話 - CLOVER

これはどういうことになっているかというと、自Nodeだけはループバックを無視するようになっているからです。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/payara-micro-service/src/main/java/fish/payara/micro/services/PayaraMicroInstance.java#L143-L144

                if (!cast.isLoopBack() && cast.getInstanceDescriptor().getMemberUUID().equals(myCurrentID)) {
                    // ignore this message as it is a loopback
                } else {
                    myListener.eventReceived(cast);
                }

仕組みは、わかった感じですね。

まとめ

Payara MicroのClustered CDI Event Busを使うのと、その内部の動きをある程度トレースしてみました。

Clustered CDI Event Busを使う時の注意点ですが、まず単一Nodeではイベントが受信できなくなるということです。あとは、送るメッセージはSerializableである必要があるということですね。

とりあえず、仕組みと性質はわかった感じなので、よしとしましょう。