最近、とある人から「PayaraのCDI Event Busの仕組みを知りたいんですけど、どうやってるんでしょうか?」と聞かれました。
言葉自体初めて聞くのに、なにそれ??みたいな感じでしたが。
まあ、Payaraのブログの絵を見せてもらって、「あ、これって…」と思うところがあったので調べてみることにしました。
Clustered CDI Event Busとは?
Payaraのブログで紹介されているのは、こちら。
Microservice choreography with the Payara Micro clustered CDI Event Bus
正確に言うと、Payara Microの機能みたいですね。
Microservicesの文脈で紹介されていますが、CDIのEvent APIベースの機能で、あるPayara Microのインスタンスで発生したCDI Eventを、別のPayara Microのインスタンスに伝播させるもののようです。
サンプルとしては、GitHubにこちらが置かれています。
Payara-Examples/Payara-Micro/cdi-clustered-events at master · payara/Payara-Examples · GitHub
で、実装を見てみると、どうやらHazelcastのTopicを使って実現しているようです。
該当するパッケージは、このあたりです。
https://github.com/payara/Payara/tree/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus
https://github.com/payara/Payara/tree/payara-server-4.1.1.162/appserver/payara-appserver-modules/payara-micro-cdi/src/main/java/fish/payara/micro/cdi
実際にHazelcastのTopic&MessageListenerを使っているのは、こちら。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/EventBus.java#L69
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/TopicListener.java#L28
と済ませるのはアレなので、もう少し追ってみましょう。
HazelcastのTopicとは
Hazelcastが提供するデータ構造のひとつで、メッセージのPublish/Subscribeのモデルを提供するものです。
Subscriber側は各Nodeで、かつ複数紐づけることができます。
簡単な例を書いてみましょう。
まずは、Topicにメッセージを受け取るためのListener、MessageListenerインターフェースを実装して浮いていてもらうクラスを用意。
HazelcastTopicReceiver.java
import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; import com.hazelcast.core.Message; import com.hazelcast.core.MessageListener; public class HazelcastTopicReceiver { public static void main(String... args) { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); ITopic<String> topic = hazelcast.getTopic("default"); topic.addMessageListener(new MyListener()); System.console().readLine("> Enter stop."); hazelcast.getLifecycleService().shutdown(); Hazelcast.shutdownAll(); } static class MyListener implements MessageListener<String> { @Override public void onMessage(Message<String> message) { System.out.printf("received message = %s%n", message.getMessageObject()); } } }
このプログラムは起動するとコンソールで待ち続けますが、Enterを打つと終了します。
メッセージを受け取るには、MessageListenerインターフェースを実装したクラスを用意し、ITopic#addMessageListenerすればOKです。
なお、MessageListenerはSerializableである必要はありません。Local Nodeのみで動作するからです。
そして、メッセージ送信側。
HazelcastTopicSender.java
import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; import com.hazelcast.core.Message; import com.hazelcast.core.MessageListener; public class HazelcastTopicSender { public static void main(String... args) { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); ITopic<String> topic = hazelcast.getTopic("default"); topic.publish(args[0]); hazelcast.getLifecycleService().shutdown(); Hazelcast.shutdownAll(); } }
ITopic#publishすることで、メッセージを送信することができます。ITopic#publishで送り出すメッセージは、Serializableである必要があります。
よって、Payara MicroのClustered CDI Event BusでEvent#fireに渡す引数も、やはりSerializableである必要があります。
動作させてみましょう。
まず、受信側のNodeを2つ起動してみます。
※Hazelcastのバージョンは、Payaraに合わせました
## Node 1 $ java -cp hazelcast-3.6.2.jar:. HazelcastTopicReceiver ## Node 2 $ java -cp hazelcast-3.6.2.jar:. HazelcastTopicReceiver
クラスタが構成されます。
Members [2] { Member [172.17.0.1]:5701 this Member [172.17.0.1]:5702 }
では、メッセージを送ってみましょう。
$ java -cp hazelcast-3.6.2.jar:. HazelcastTopicSender "Hello Cluster"
すると、待機していた2つのNode両方のコンソールに、送ったメッセージが表示されます。
## Node 1 received message = Hello Cluster ## Node 2 received message = Hello Cluster
このように、Topicを介して各Nodeにメッセージを送信、Listenerで受信できる仕組みとなります。
今回はメッセージの送受信でクラスを分けましたが、ひとつのNodeでメッセージの送信、受信を行うプログラムを書いてもなんら問題はありません。
では、続けてPayara MicroでのClustered CDI Event Busを見ていってみましょう。
Payara MicroのClustered CDI Event Busを使う
それでは、GitHubのサンプルリポジトリを参考に、Payara MicroのClustered CDI Event Busを使うプログラムを書いてみましょう。
準備
Payara MicroのJARファイルはダウンロード済みとします。今回使うバージョンは、「4.1.1.162」です。
Maven依存関係は、以下のとおり。
<dependency> <groupId>fish.payara.extras</groupId> <artifactId>payara-micro</artifactId> <version>4.1.1.162</version> <scope>provided</scope> </dependency>
できあがるWARファイルの名前は、「app.war」とします。
メッセージ用のクラスの作成
CDI Event APIで、送受信に使用するクラスを定義します。
単にStringのメッセージを持つだけですが、追加としてPayara MicroのインスタンスのUUIDも設定できるようにしています。これは、サンプルのマネですが。
src/main/java/org/littlewings/payara/cdi/EventMessage.java
package org.littlewings.payara.cdi; import java.io.Serializable; public class EventMessage implements Serializable { private static final long serialVersionUID = 1L; private String value; private String memberId; public EventMessage(String value, String memberId) { this.value = value; this.memberId = memberId; } public String getValue() { return value; } public String getMemberId() { return memberId; } }
メッセージの送信側
メッセージの送信側は、JAX-RSでコードを作成することにします。
JAX-RSの有効化。
src/main/java/org/littlewings/payara/rest/JarsApplication.java
package org.littlewings.payara.rest; import javax.ws.rs.ApplicationPath; import javax.ws.rs.core.Application; @ApplicationPath("rest") public class JarsApplication extends Application { }
QueryStringで受け取ったメッセージを、CDIのイベントに送るクラス。
src/main/java/org/littlewings/payara/rest/EventSendResource.java
package org.littlewings.payara.rest; import javax.enterprise.context.RequestScoped; import javax.enterprise.event.Event; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import fish.payara.micro.PayaraMicroRuntime; import fish.payara.micro.cdi.Outbound; import org.littlewings.payara.cdi.EventMessage; @Path("event") @RequestScoped public class EventSendResource { @Inject @Outbound private Event<EventMessage> event; @Inject private PayaraMicroRuntime runtime; @GET @Path("send") @Produces(MediaType.TEXT_PLAIN) public String send(@QueryParam("message") String message) { event.fire(new EventMessage(message, runtime.getLocalDescriptor().getMemberUUID())); return "OK!"; } }
特になんの変哲もない、JAX-RSとCDI Event APIを使ったクラスですね?
…と言いたいところですが、よーく見るとEventの@Injectする時に、@Outboundアノテーションが付いています。
@Inject @Outbound private Event<EventMessage> event;
メッセージの受信側
続いて、メッセージの受信側を作成します。
src/main/java/org/littlewings/payara/cdi/EventMessageReceiver.java
package org.littlewings.payara.cdi; import javax.enterprise.context.Dependent; import javax.enterprise.event.Observes; import fish.payara.micro.cdi.Inbound; @Dependent public class EventMessageReceiver { public void observe(@Observes @Inbound EventMessage event) { System.out.printf("received message = %s, member = %s%n", event.getValue(), event.getMemberId()); } }
こちらも普通のCDIの@Observesを利用したもの、と思いきや、@Inboundアノテーションが付いています。
public void observe(@Observes @Inbound EventMessage event) {
@Outboundも@Inboundも、実は@Qualifierです。
ServletContextListenerを作る
用意はこれだけで終わりではなくて、Clustered CDI Event Busの初期化が必要です。
追記)
Payara 171以降では、初期化処理は不要になりました。
Payara MicroのClustered CDI Event Busが初期化処理が不要になり、loopbackができるようになったという話 - CLOVER
今回は、ServletContextListenerを使って行うことにしました。
src/main/java/org/littlewings/payara/servlet/ClusteredEventBusInitializeListener.java
package org.littlewings.payara.servlet; import javax.inject.Inject; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; import fish.payara.micro.cdi.ClusteredCDIEventBus; @WebListener public class ClusteredEventBusInitializeListener implements ServletContextListener { @Inject private ClusteredCDIEventBus clusteredEventBus; @Override public void contextInitialized(ServletContextEvent servletContextEvent) { clusteredEventBus.initialize(); } @Override public void contextDestroyed(ServletContextEvent servletContextEvent) { // no-op } }
ClusteredCDIEventBusを@Injectしてinitializeしていますが
@Inject private ClusteredCDIEventBus clusteredEventBus;
実はこのinitializeを呼ぶのはどうでもよくて(ログ出力のみなので)、実際にはClusteredCDIEventBusの@PostConstructを呼び出すのが目的のようです。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/appserver/payara-appserver-modules/payara-micro-cdi/src/main/java/fish/payara/micro/cdi/ClusteredCDIEventBus.java#L81-L85
この処理は、Servletの起動時には行われている必要があるようで、サンプルでも
https://github.com/payara/Payara-Examples/blob/master/Payara-Micro/cdi-clustered-events/event-sender/src/main/java/fish/payara/examples/payaramicro/event/sender/SendEventServlet.java#L124-L131
@Override public void init() throws ServletException { super.init(); // you must intitialize the Event Bus in your servlet for events to flow. bus.initialize(); }
みたいに書かれていて、Servlet#initで初期化せよ、と。
しかも、loadOnStartupも1ですし。
https://github.com/payara/Payara-Examples/blob/master/Payara-Micro/cdi-clustered-events/event-sender/src/main/java/fish/payara/examples/payaramicro/event/sender/SendEventServlet.java#L38-L39
@WebServlet(name = "SendEventServlet", urlPatterns = {"/SendEventServlet"}, loadOnStartup = 1) public class SendEventServlet extends HttpServlet {
最初、これに気付かなくてJAX-RSのリソースクラスで初期化してたら、うまく動かなくてハマりました…。
CDIを使っていると最終的には有効になるようですが、CDI Beanの初期化のタイミングで、実際にメッセージを受け取れるタイミングがバラバラになったりするので、起動時に初期化しておいた方がよいです。
これで、準備は完了となります。
動かしてみる
それでは、作成したプログラムを動かしてみます。
まずはパッケージング。
$ mvn package
app.warというファイルができます。
ここで注意点ですが、Payara MicroのClustered CDI Event Busを使用するには、最低でもPayara Microのインスタンスが2 Node以上必要になります。
これは、イベントを発火させたNode自体は、イベントを受け取らないためです。なので、1 Nodeだと何も起こらないように見えてしまいます…。
このあたりは、あとで説明します。
では、Payara MicroでWARをデプロイして確認してみましょう。今回は3 Node起動してみます。
## Node 1 $ java -jar payara-micro-4.1.1.162.jar --deploy target/app.war --autoBindHttp ## Node 2 $ java -jar payara-micro-4.1.1.162.jar --deploy target/app.war --autoBindHttp ## Node 3 $ java -jar payara-micro-4.1.1.162.jar --deploy target/app.war --autoBindHttp
ポートの割り振りは、--autoBindHttpで行いました。
これでポート8080、8081、8082でリッスンするPayara Microのインスタンスが3つ立ち上がります。
クラスタも構成されると。
Members [3] { Member [172.17.0.1]:5900 this Member [172.17.0.1]:5901 Member [172.17.0.1]:5902 }
$ curl http://localhost:8080/app/rest/event/send?message=HelloWorld-to-node1 OK!
結果、コンソールへは以下のような出力になります。
## Node 1 # なにもなし ## Node 2 received message = HelloWorld-to-node1, member = ea64035d-a4a8-46c2-a57e-d949783f637b ## Node 3 received message = HelloWorld-to-node1, member = ea64035d-a4a8-46c2-a57e-d949783f637b
今度は、Node 3に送ってみましょう。
$ curl http://localhost:8082/app/rest/event/send?message=HelloWorld-to-node3 OK!
結果。
## Node 1 received message = HelloWorld-to-node3, member = a5b94764-8cab-4249-8350-62fbffdc7f25 ## Node 2 received message = HelloWorld-to-node3, member = a5b94764-8cab-4249-8350-62fbffdc7f25 ## Node 3 # なにもなし
というわけで、メッセージを受けたNode以外がイベント通知を受信します。
とりあえず、動いてそうですね?
で、どうなってるのか?
ここで、少しトレースするために、イベントを受け取るクラスでスタックトレースを出力するようにしてみましょう。
@Dependent public class EventMessageReceiver { public void observe(@Observes @Inbound EventMessage event) { System.out.printf("received message = %s, member = %s%n", event.getValue(), event.getMemberId()); Thread.dumpStack(); } }
これで再度確認すると、こんなスタックトレースが得られます。
java.lang.Exception: Stack trace at java.lang.Thread.dumpStack(Thread.java:1329) at org.littlewings.payara.cdi.EventMessageReceiver.observe(EventMessageReceiver.java:13) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.jboss.weld.injection.StaticMethodInjectionPoint.invoke(StaticMethodInjectionPoint.java:88) at org.jboss.weld.injection.StaticMethodInjectionPoint.invoke(StaticMethodInjectionPoint.java:78) at org.jboss.weld.injection.MethodInvocationStrategy$SimpleMethodInvocationStrategy.invoke(MethodInvocationStrategy.java:129) at org.jboss.weld.event.ObserverMethodImpl.sendEvent(ObserverMethodImpl.java:309) at org.jboss.weld.event.ObserverMethodImpl.sendEvent(ObserverMethodImpl.java:287) at org.jboss.weld.event.ObserverMethodImpl.notify(ObserverMethodImpl.java:265) at org.jboss.weld.event.ObserverNotifier.notifySyncObservers(ObserverNotifier.java:271) at org.jboss.weld.event.ObserverNotifier.notify(ObserverNotifier.java:260) at org.jboss.weld.event.EventImpl.fire(EventImpl.java:89) at fish.payara.micro.cdi.ClusteredCDIEventBus.eventReceived(ClusteredCDIEventBus.java:60) at fish.payara.micro.services.PayaraMicroInstance.receiveMessage(PayaraMicroInstance.java:146) at fish.payara.nucleus.eventbus.TopicListener.onMessage(TopicListener.java:56) at com.hazelcast.topic.impl.TopicService.dispatchEvent(TopicService.java:131) at com.hazelcast.spi.impl.eventservice.impl.EventProcessor.process(EventProcessor.java:48) at com.hazelcast.spi.impl.eventservice.impl.RemoteEventProcessor.run(RemoteEventProcessor.java:36) at com.hazelcast.util.executor.StripedExecutor$Worker.process(StripedExecutor.java:187) at com.hazelcast.util.executor.StripedExecutor$Worker.run(StripedExecutor.java:171)
バッチリHazelcastのTopicが使われていますね。
Payara MicroでHazelcastを使えるようにすると、TopicにMessageListenerが登録されます。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/EventBus.java#L69
送信するメッセージは、ClusterMessageとしてラップしてTopicに登録されます、
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/EventBus.java#L56
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/ClusterMessage.java
TopicのMessageListenerは、ClusterMessageの形式で受け取ります。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/hazelcast-bootstrap/src/main/java/fish/payara/nucleus/eventbus/TopicListener.java#L28
あとはスタックトレースが指す通り、Local NodeのCDI Eventが発行され、CDIのイベント受信側のクラスが起動するという仕掛けです。
at org.jboss.weld.event.EventImpl.fire(EventImpl.java:89) at fish.payara.micro.cdi.ClusteredCDIEventBus.eventReceived(ClusteredCDIEventBus.java:60) at fish.payara.micro.services.PayaraMicroInstance.receiveMessage(PayaraMicroInstance.java:146) at fish.payara.nucleus.eventbus.TopicListener.onMessage(TopicListener.java:56)
でも、この理屈だと全Nodeがイベントを受け取ることになるのですが、イベントが発生したNode自体は通知を受けていなかったと思います。
追記)
Payara 171以降では、イベントが発生したNodeでもイベントを受け取れるようになりました。
Payara MicroのClustered CDI Event Busが初期化処理が不要になり、loopbackができるようになったという話 - CLOVER
これはどういうことになっているかというと、自Nodeだけはループバックを無視するようになっているからです。
https://github.com/payara/Payara/blob/payara-server-4.1.1.162/nucleus/payara-modules/payara-micro-service/src/main/java/fish/payara/micro/services/PayaraMicroInstance.java#L143-L144
if (!cast.isLoopBack() && cast.getInstanceDescriptor().getMemberUUID().equals(myCurrentID)) { // ignore this message as it is a loopback } else { myListener.eventReceived(cast); }
仕組みは、わかった感じですね。