以前書いたこちらのエントリ
Hazelcast Internal(構造編) - CLOVER
の続編です。
今回は、ネットワークまわりを扱います。前回は、PartitionやRecordStoreなどの内部構造をテーマに扱いました。
前回同様にこのエントリを読むような方は、
- ある程度Hazelcastの基本的な使い方を知っている
- Hazelcastの中の構造などに興味がある
といった方々を対象とします。やっぱり、対象範囲は狭いことに変わりはありませんが。
このエントリで解説する内容
以下のテーマを題材にします。
- Node Discovery(※)
- Heartbeat
- RPC
※…今のNode Discoveryの仕組みは、近いうちになくなるかもしれません
まあ、RPCの部分は前回とちょっとかぶるので、少し薄めになりますが。
それぞれ、順を追って書いていってみましょう。
Hazelcastのバージョンは、都合上3.7.1を対象とします。
その前に
と、その前に、Hazelcastのライブラリ事情とロギングについて少し。
Hazelcastの特徴として、他ライブラリへの依存関係がない、ということが挙げられます。Embeddedに使うだけなら、hazelcast.jar単一で済みます。ネットワークまわりも、Nettyなどを用いずすべて自前実装です。このため、ほとんどの内容はHazelcastのソースコードを追いかければ(根気があれば)読み解くことができます。
あと、ロギングはデフォルトはjava.util.loggingですが、設定変更と依存関係の追加でLog4j、SLF4Jに切り替えることができます。
例えば、ロギングをSLF4J&Logbackに切り替える場合は、Hazelcastの設定ファイルを用意して、プロパティ「hazelcast.logging.type」を「slf4j」とします。
src/main/resources/hazelcast.xml
<?xml version="1.0" encoding="UTF-8"?> <hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.7.xsd" xmlns="http://www.hazelcast.com/schema/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <group> <name>my-cluster</name> <password>my-cluster-pass</password> </group> <network> <port auto-increment="true" port-count="100">5701</port> <join> <multicast enabled="true"> <multicast-group>224.2.2.3</multicast-group> <multicast-port>54327</multicast-port> </multicast> </join> </network> <properties> <property name="hazelcast.logging.type">slf4j</property> </properties> </hazelcast>
Maven依存関係にもSLF4JとLogbackを足しておきます。
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> </dependency>
Logbackの設定ファイルも用意しますが、この時Log LevelをDEBUGなどにすると、より詳細なトレース情報を得ることができます。
※デフォルトはjava.util.loggingが使われるので、この設定ファイルを用意してもいいですが
src/main/resources/logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %t %msg%n</pattern> </encoder> </appender> <root level="debug"> <appender-ref ref="STDOUT"/> </root> </configuration>
Nodeの追加や削除、Heartbeatのログなども得られるので、困った時や情報を知りたい時には使ってみるとよいでしょう。
例えば、Node追加時のログ。
2016-10-16 16:48:40.844 DEBUG hz._hzInstance_1_my-cluster.MulticastThread [172.17.0.1]:5701 [my-cluster] [3.7.1] Dropped: JoinMessage{packetVersion=4, buildNumber=20160905, address=[172.17.0.1]:5701, uuid='ab3763a6-c877-4d1c-8143-a049db5a4aa5', liteMember=false, memberCount=0, dataMemberCount=0} 2016-10-16 16:48:40.990 DEBUG hz._hzInstance_1_my-cluster.MulticastThread [172.17.0.1]:5701 [my-cluster] [3.7.1] Dropped: JoinMessage{packetVersion=4, buildNumber=20160905, address=[172.17.0.1]:5701, uuid='ab3763a6-c877-4d1c-8143-a049db5a4aa5', liteMember=false, memberCount=0, dataMemberCount=0} 2016-10-16 16:48:41.140 INFO hz._hzInstance_1_my-cluster.IO.thread-Acceptor [172.17.0.1]:5701 [my-cluster] [3.7.1] Accepting socket connection from /192.168.254.129:38008 2016-10-16 16:48:41.302 INFO hz._hzInstance_1_my-cluster.cached.thread-3 [172.17.0.1]:5701 [my-cluster] [3.7.1] Established socket connection between /172.17.0.1:5701 and /192.168.254.129:38008 2016-10-16 16:48:42.005 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Handling join from [172.17.0.1]:5702, joinInProgress: false 2016-10-16 16:48:42.018 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Handling join from [172.17.0.1]:5702, joinInProgress: false, timeToStart: 4988 2016-10-16 16:48:43.003 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Handling join from [172.17.0.1]:5702, joinInProgress: false, timeToStart: 4003 2016-10-16 16:48:44.002 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Handling join from [172.17.0.1]:5702, joinInProgress: false, timeToStart: 3003 2016-10-16 16:48:45.004 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Handling join from [172.17.0.1]:5702, joinInProgress: false, timeToStart: 2002 2016-10-16 16:48:46.005 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Handling join from [172.17.0.1]:5702, joinInProgress: false, timeToStart: 1001 2016-10-16 16:48:47.015 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Handling join from [172.17.0.1]:5702, joinInProgress: false, timeToStart: -10 2016-10-16 16:48:47.015 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Starting join... 2016-10-16 16:48:47.017 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Received heartbeat from Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 (now: 2016-10-16 16:48:47.017, timestamp: 2016-10-16 16:48:47.017) 2016-10-16 16:48:47.017 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] MasterConfirmation has been received from Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 2016-10-16 16:48:47.017 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Updating members [Member [172.17.0.1]:5701 - ab3763a6-c877-4d1c-8143-a049db5a4aa5 this, Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941] 2016-10-16 16:48:47.017 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Adding Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 2016-10-16 16:48:47.020 INFO hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Members [2] { Member [172.17.0.1]:5701 - ab3763a6-c877-4d1c-8143-a049db5a4aa5 this Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 }
Heartbeatのログ。
2016-10-16 16:48:47.029 DEBUG hz._hzInstance_1_my-cluster.generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Received heartbeat from Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 (now: 2016-10-16 16:48:47.029, timestamp: 2016-10-16 16:48:47.019) 2016-10-16 16:48:50.747 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Received heartbeat from Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 (now: 2016-10-16 16:48:50.747, timestamp: 2016-10-16 16:48:50.741) 2016-10-16 16:48:55.747 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Received heartbeat from Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 (now: 2016-10-16 16:48:55.746, timestamp: 2016-10-16 16:48:55.744) 2016-10-16 16:49:00.747 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Received heartbeat from Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 (now: 2016-10-16 16:49:00.747, timestamp: 2016-10-16 16:49:00.745) 2016-10-16 16:49:05.746 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Received heartbeat from Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 (now: 2016-10-16 16:49:05.746, timestamp: 2016-10-16 16:49:05.743) 2016-10-16 16:49:10.747 DEBUG hz._hzInstance_1_my-cluster.priority-generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] Received heartbeat from Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 (now: 2016-10-16 16:49:10.746, timestamp: 2016-10-16 16:49:10.745) 2016-10-16 16:49:10.750 DEBUG hz._hzInstance_1_my-cluster.generic-operation.thread-0 [172.17.0.1]:5701 [my-cluster] [3.7.1] MasterConfirmation has been received from Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941
Node離脱時のログ。
2016-10-16 16:50:06.987 DEBUG hz._hzInstance_1_my-cluster.generic-operation.thread-3 [172.17.0.1]:5701 [my-cluster] [3.7.1] Removing member [172.17.0.1]:5702, uuid: 96255746-dcb7-4035-b7fb-6598fd38b941, requested by: [172.17.0.1]:5702 2016-10-16 16:50:06.988 INFO hz._hzInstance_1_my-cluster.generic-operation.thread-3 [172.17.0.1]:5701 [my-cluster] [3.7.1] Connection[id=1, /172.17.0.1:5701->/192.168.254.129:38008, endpoint=[172.17.0.1]:5702, alive=false, type=MEMBER] closed. Reason: Removing member [172.17.0.1]:5702, uuid: 96255746-dcb7-4035-b7fb-6598fd38b941, requested by: [172.17.0.1]:5702 2016-10-16 16:50:06.991 INFO hz._hzInstance_1_my-cluster.generic-operation.thread-3 [172.17.0.1]:5701 [my-cluster] [3.7.1] Removing Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 2016-10-16 16:50:06.991 DEBUG hz._hzInstance_1_my-cluster.event-5 [172.17.0.1]:5701 [my-cluster] [3.7.1] Removed connection [172.17.0.1]:5702 2016-10-16 16:50:06.991 DEBUG hz._hzInstance_1_my-cluster.generic-operation.thread-3 [172.17.0.1]:5701 [my-cluster] [3.7.1] Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 is dead, sending remove to all other members... 2016-10-16 16:50:06.992 DEBUG hz._hzInstance_1_my-cluster.generic-operation.thread-3 [172.17.0.1]:5701 [my-cluster] [3.7.1] Removing Member [172.17.0.1]:5702 - 96255746-dcb7-4035-b7fb-6598fd38b941 2016-10-16 16:50:06.994 INFO hz._hzInstance_1_my-cluster.generic-operation.thread-3 [172.17.0.1]:5701 [my-cluster] [3.7.1] Members [1] { Member [172.17.0.1]:5701 - ab3763a6-c877-4d1c-8143-a049db5a4aa5 this }
と、ロギング事情はこのくらいにして、先へ進めましょう。
Node Discovery
HazelcastはNodeを起動すると、指定された方法に沿って他のNodeを見つけ出し、クラスタを構成します。
このNodeをどうやって見つけているのか?という話です。
この仕組みを提供しているのが、Joinerというインターフェースです。Hazelcastには、デフォルトでマルチキャストとTCP/IPの2つがNode Discoveryの仕組みとして組み込まれています。さらにデフォルトで選択されているのは、マルチキャストです。
Hazelcast起動時、Nodeクラスのインスタンス起動時に、このJoinerインターフェースのjoinメソッドを呼び出し、既存クラスタに参加しようとします。
また、マルチキャストとTCP/IP以外にもプラグインとして他のJoinerの実装が提供されており、以下が利用できます。
- EC2
- Azure
- jclouds
Hazelcastは、データの持ち主(Primary PartitionやBackup Partition)の概念はあるものの、各Nodeは平等のようなイメージがあるかもしれませんが、実際にはMaster Node(ドキュメント中ではLeaderと書かれていますが、ソースコード中ではMasterと記載)があり、Master Nodeがクラスタの管理をしています。
クラスタ構成時、他に誰もNodeがいなかった場合は、最初のNodeがMaster Nodeとなります。
Node DiscoveryとはMaster Nodeを探し、クラスタ参加リクエストを送って既存クラスタに参加することに他なりません。
まずは、マルチキャストを使ったNode Discoveryについて見ていきましょう。
Hazelcastの内部クラスの中心にいるクラスのひとつとして、Nodeがありますが、このNodeがJoinerのインスタンスを保持しています。どのようなJoinerを持つかは、設定に依存します。マルチキャストを使った場合は、MulticastJoinerとなります。
MulticastJoinerを使った場合は、以下の図に登場するクラスが中心に動きます。
まず、MulticastJoinerがJoinRequestをMulticastServiceを介してマルチキャストで送信します。MulticastServiceはRunnableでもあり、Node起動時にマルチキャストメッセージに対応するListenerを紐付けて起動しています。このListenerがJoinRequestを受け取り、自分がMaster Nodeであった場合は自分のAddressを乗せてJoinMessageを送り返します。
JoinMessageに乗った情報を、Listenerが再度受け取り、NodeにMaster Addressとして設定します。
ところで、Hazelcastを使ってクラスタを構成すると、こんな表示を見ることが多いと思います。
Members [2] { Member [172.17.0.1]:5701 this Member [172.17.0.1]:5702 }
ここで表示されているポート(ここでは、5701と5702)は、クラスタ内のNodeの通信用ポートです(ドキュメントでは、communicate〜と書かれているので、便宜的にCommunication Portと呼びましょう)。
マルチキャストの場合は、ここまではCommunication Portは出てきません。TCP/IPの場合は、即Communication Portが出てきます。
TCP/IPの場合は、ClusterJoinManagerを使い、設定で利用可能なアドレスの範囲にJoinMessageを乗せてMasterDiscoveryOperationをシリアライズして送信します。この時の送信先のポートは、Communication Portを使ってAuto Incrementや設定を考慮した内容で決められます。MasterDiscoveryOperationを受け取った各Nodeは、自分がMasterであればやはりAddressを送り返します。送り返されるAddressは、SetMasterOperationに乗ってやってきてNodeに設定される流れになります。SetMasterOperationを送り返す時も、同様にClusterJoinManagerが使用されます。
ここまでで、Master NodeのAddressがわかります。
クラスタに参加するには、ここからClusterJoinManager越しに、JoinRequestOperationを送って完了となります。これについては、マルチキャストであろうとも単一Nodeに向けてのOperationとなります。ここで使用するのは、マルチキャストであってもTCP/IPであってもCommunication Portです。
ところで、Master Nodeが見つからないとはどういう状況を意味するのか?ですが、マルチキャストの場合は既定の試行回数JoinRequestを送って、それでもMaster Nodeが見つからない場合は、自分がMaster Nodeとなります。TCP/IPの場合は、既定の時間が過ぎてもMaster Nodeが見つからなかった場合、もしくは自分がAddress上、もっとも高い位置にいる場合は自分がMaster Nodeとなります。
そして、すべてのJoinerを無効にすると
<network> <join> <multicast enabled="false"/> <tcp-ip enabled="false"/> </join> </network>
クラスタへの参加手段を失うため、スタンドアロンな状態で起動することになります。この場合、起動速度も速くなります。
警告: [172.17.0.1]:5701 [dev] [3.7.1] No join method is enabled! Starting standalone.
以上が、Node Discoveryです。
Discovery SPI
ところで、ここまでで解説したJoinerインターフェースは、実はHazelcast 3.7から非推奨になっています。
では、Joinerの役割は誰が引き継ぐのかというと、Discovery SPIというものが開発されています。
現在、Hazelcastに同梱されているDiscovery SPIの実装は、マルチキャストのみであり、またDiscovery SPIを使用するためには通常のJoinerの設定はすべて無効にし、Discovery SPIの明示的な有効化が必要です。
コミュニティベースのものも含め、
- jclouds
- Consul
- etcd
- Eureka
- Kubernetes
- ZooKeeper
と出てきているので、そのうちDiscovery SPIに移っていくのでしょう。
Heartbeat
続いては、Nodeの死活監視のためのHeartbeatについてです。Hazelcastでは、Node間で定期的にHeartbeatを行って死活監視を行っています。
Hazelcastでの死活監視は、以下の3つで行われます。
- (ICMPを有効にした場合)PING
- HeartbeatOperation
- MasterConfirmatinoOperation
デフォルトでは、HeartbeatOperationとMasterConfirmationOperationが実行されます。PINGは、設定しておけば追加される感じです。
このうち、PINGとHeartbeatOperationはセットで実行されます。
Heartbeatまわりのクラス構成は、簡単には以下のようになっています。
HeartbeatOperation(とPING)、そしてMasterConfirmationOperationは定期的にExecutorServiceで実行されます。この処理を行うのは、ClusterHeartbeatManagerです。
Heartbeat自体は、各Nodeが自Nodeを除く全NodeにHeartbeatOperationとして送信し、その時のタイムスタンプを送信先のNodeで保持しておきます(Operationは、Remote Nodeで実行されます)。次回のHeartbeat送信時に各Nodeから最後に受け取ったタイムスタンプが閾値を越えている場合は、対象のNodeがクラスタから切り離されます。
MasterConfirmationOperationの場合は、各NodeがMaster Nodeに向けてOperationを送信し、Master Nodeで保持しているタイムスタンプを更新します。
MasterConfirmationOperationの結果の確認ですが、Master Nodeとそれ以外のNodeで動作が異なり、Master Nodeのみ他のNodeから送信されてきたMasterConfirmationOperationで更新されたタイムスタンプを確認し、通常のHeartbeatと同じように閾値以上の時間、MasterConfirmationOperationによる更新がなければ、やはり対象のNodeをクラスタから切り離します。
なお、PINGについては応答がないNodeについては指定の回数リトライし、それでもPINGに応答がなければその時点でクラスタから切り離します。
RPC
最後は、RPCです。といっても、ここはいろいろありそうなので、さわりだけ。
ここでは、Communication Portを使った話がメインです。
Hazelcastでは、NIOのServerSocketChannelを使って、自前でリクエストを処理するコードを書いています。前述しましたが、ここでNettyなどは使っていません。
Hazelcastの起動時に、割り当てられたCommunication PortでServerSocketChannelを開き、これをTcpIpConnectionManagerで管理します。が、実際に接続してきたSocketChannelをさばくのは、SocketAcceptorThreadになります。Selectorをopenして、NIOで出てくるSelectionKeyなどを扱うのも、SocketAcceptorThreadです。
クライアントからの接続があったら、そのSocketChannelをTcpIpConnectionでラップし、以降の処理はSocketAcceptorThreadから切り離します。TcpIpConnectionに対する読み書きは、NonBlockingIOThread、つまり別スレッドで行います。読み込みはSocketReader、書き込みはSocketWriterで、それぞれNonBlogkingIOThreadが別々に割り当てられます。割り当てられますといってもひとつの接続に対して読み書きで2本、という使い方ではなくひとつのスレッドでイベントがあった接続を処理して切り替えていく感じですね。
なお、このNonBlockingIOThreadの生成は、IOThreadingModelインターフェースを実装したNonBlockingIOThreadingModelで行います。
クライアント側からのデータ送信を行う時も、同じ仕組みが登場します。Socketのリッスンはクライアントの文脈では出てこないのでSocketAcceptorThreadは登場しませんが、接続をTcpIpConnectionとして表現したり、読み書きをSocketReader/SocketWriterで行うのも同じです。
ところで、前回、例えばDistributed Mapに対するgetやputなどの操作は、Operationとして表現されると書きました。
HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); IMap<String,String> map = hazelcast.getMap("default"); map.put("key1", "value1"); map.get("key1");
putであれば、PutOperationとして表現されます。
このOperationですが、Remote Nodeに送信される時はシリアライズして送られます。この時、Packetという形式でラップされて送られるようです。
Remote Nodeでは、SocketAcceptorThreadで受け付け、TcpIpConnectionに対してSocketReader(の実装であるNonBlockingSocketReader)がPacketを読み出しOperationExecutorImplに引き渡します。
ここですぐに実行するのではなく、OperationをタスクとしてQueueに積みます。
Queueは2種類あって、PartitionのIdを意識するタスクは、Partitionに応じた処理を行うPartitionOperationThreadの個々のインスタンスが保持するQueueに積まれます。Partitionに依存しないようなタスクの場合(Operationが持つPartitionIdが-1を指している場合)は、OperationExecutorImplが持つQueueに積まれます。
あとは、各ThreadがQueueからタスクを取り出して処理を行う、と。
Partitionを意識するOperationは主にデータを操作するもので、例えばPutOperationなどです。Partitionを意識しないOperationは、Node自体の話になります。例えば、MasterConfirmationOperationなどです。
この後のPartitionに応じたデータの保存などは、前回のエントリに書いてあるので、そちらをご覧いただくとよいでしょう。
まとめ
2回にわたって、Hazelcastの内部をちょっと追ってみました。
なかなか大変というか、ネットワークまわりはやっぱり追うのが大変でしたが、ある程度わかったのでまあよかったのかな?
それなりに書くのに時間はかかりましたが、追ってみてよかったです。
なお、この2本のエントリを書いたきっかけは、こちらのツイートです。
Payara MicroはHazelcastでクラスタリングするから、Hazelcastの内部動作(とか実際の通信のキャプチャ内容)なんかにも触れた方がいいかもしれないけど、そういうのはCLOVERでやってよ的な他力本願なところもあったりする…
やろうかな、どうしようかなと前々から思っていたテーマだったので、いいきっかけになりました(笑)。
ありがとうございます。