Hazelcast 3.8のリリースで追加された、Schduled Executor Serviceを試してみようと思います。
現時点のHazelcastのバージョンは3.9.1なので、少し前に追加された機能にあたりますが、ちょっと気になっていたので
触っておこうかと。
http://docs.hazelcast.org/docs/rn/3.8.html
Hazelcast/Scheduled Executor Service
ScheduledExecutorServiceといえば、java.util.concurrentパッケージに同名のインターフェースがありますが、
ScheduledExecutorService (Java Platform SE 8)
これのHazelcast版と思っていただければ問題ありません。
IScheduledExecutorService (Hazelcast Root 3.9.1 API)
機能としては
- schedule … 指定した遅延時間の後、タスクを実行(単発)
- scheduleAtFixedRate … 指定した遅延時間の後、指定した期間ごとにタスクを実行(繰り返し)
の2つのメソッドに対して
- schedule … クラスタ内のいずれかのMember上でタスクを実行
- scheduleOnMember: クラスタ内の指定したMember上でタスクを実行
- scheduleOnKeyOwner: クラスタ内の、指定したキーのオーナーであるMember上でタスクを実行
- scheduleOnMembers: クラスタ内の、指定したMember(複数可)上でタスクを実行
- scheduleOnAllMembers: クラスタ内の全Member上でタスクを実行
の5つの組み合わせの中から実行パターンを選ぶことができます。
なお、java.util.concurrent.ScheduledExecutorServiceが提供している、scheduleWithFixedDelayメソッドについては
サポートしていません。
とまあこんな概要なのですが、とりあえず使ってみましょうか。
準備
Maven依存関係は、こちら。
<dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>3.9.1</version> </dependency>
あと、クラスタを構成するために、浮いていてもらうNodeを作っておきましょう。
src/main/java/org/littlewings/hazelcast/distexec/EmbeddedHazelcastServer.java
package org.littlewings.hazelcast.distexec; import java.time.LocalDateTime; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; public class EmbeddedHazelcastServer { public static void main(String... args) { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); try { System.out.printf("[%s] startup, Embedded Hazelcast Server.%n", LocalDateTime.now()); System.console().readLine("> Enter stop."); } finally { hazelcast.shutdown(); Hazelcast.shutdownAll(); } } }
起動。2 Node浮いていてもらいます。
## Node 1 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.distexec.EmbeddedHazelcastServer ## Node 2 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.distexec.EmbeddedHazelcastServer
クラスタが構成されました。
Members {size:2, ver:2} [ Member [172.22.0.1]:5702 - 2b49f301-3dd2-4b6a-b7a8-5e955006e938 Member [172.22.0.1]:5701 - d769e220-53b1-45b7-889a-9936a53ddb60 this ]
ここから先は、このクラスタに参加して、タスクを放り込むNodeを作成します。
はじめてのHazelcast Scheduled Executor Service
基本的な使い方はjava.util.concurrent.ScheduledExecutorServiceと同じなので、RunnableやCallableをタスクとして
放り込むことになります。
簡単なRunnableを作ってみます。
src/main/java/org/littlewings/hazelcast/distexec/SayHelloTask.java
package org.littlewings.hazelcast.distexec; import java.io.Serializable; import java.time.LocalDateTime; public class SayHelloTask implements Runnable, Serializable { @Override public void run() { System.out.printf("[%s] Hello!!%n", LocalDateTime.now()); } }
まあ大したことのないコードですが、Serializableを実装しているところがポイントです。
最初は、ただのscheduleメソッドでどこかのMember上で動かしてみましょう。
src/main/java/org/littlewings/hazelcast/distexec/SingleMemberTaskExecutor.java
package org.littlewings.hazelcast.distexec; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.scheduledexecutor.IScheduledExecutorService; import com.hazelcast.scheduledexecutor.IScheduledFuture; public class SingleMemberTaskExecutor { public static void main(String... args) throws ExecutionException, InterruptedException { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); try { IScheduledExecutorService es = hazelcast.getScheduledExecutorService("default"); IScheduledFuture<?> future = es.schedule(new SayHelloTask(), 5, TimeUnit.SECONDS); future.get(); } finally { hazelcast.shutdown(); Hazelcast.shutdownAll(); } } }
作成したRunnableをIScheduledExecutorService#scheduleに放り込めばいいのですが、返ってくるのはIScheduledFutureです。
ふつうのFutureと同じく、getで待ち合わせをします。
IScheduledExecutorService es = hazelcast.getScheduledExecutorService("default"); IScheduledFuture<?> future = es.schedule(new SayHelloTask(), 5, TimeUnit.SECONDS); future.get();
では、実行。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.distexec.SingleMemberTaskExecutor
クラスタが構成されて
Members {size:3, ver:5} [ Member [172.22.0.1]:5702 - 2b49f301-3dd2-4b6a-b7a8-5e955006e938 Member [172.22.0.1]:5701 - d769e220-53b1-45b7-889a-9936a53ddb60 Member [172.22.0.1]:5703 - 1552aba3-b350-41df-85ac-9f31fb560e07 this ]
今回は、最初に起動したNodeで動作しました(この見た目ではわかりませんが)。
[2017-12-08T23:29:06.150] Hello!!
今度はCallableで
次は、Callableで試してみます。
src/main/java/org/littlewings/hazelcast/distexec/HelloCallableTask.java
package org.littlewings.hazelcast.distexec; import java.io.Serializable; import java.time.LocalDateTime; import java.util.concurrent.Callable; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; public class HelloCallableTask implements Callable<String>, HazelcastInstanceAware, Serializable { transient HazelcastInstance hazelcast; @Override public void setHazelcastInstance(HazelcastInstance hazelcast) { this.hazelcast = hazelcast; } @Override public String call() throws Exception { return String.format("[%s] Hello from %s!!", LocalDateTime.now(), hazelcast.getCluster().getLocalMember().getUuid()); } }
こちらも大したことのない処理ですが、今回はHazelcastInstanceAwareインターフェースを実装し、HazelcastInstanceを
受け取れるようにしました。こちらを使って、自分のUUIDを返却するようにしています。
なお、HazelcastInstanceはSerializableではないので、transientにしておく必要があります。
transient HazelcastInstance hazelcast;
こちらのCallableを使ってみましょう。
public class SingleMemberCallableTaskExecutor { public static void main(String... args) throws ExecutionException, InterruptedException { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); try { IScheduledExecutorService es = hazelcast.getScheduledExecutorService("default"); IScheduledFuture<String> future = es.schedule(new HelloCallableTask(), 5, TimeUnit.SECONDS); System.out.println(future.get()); } finally { hazelcast.shutdown(); Hazelcast.shutdownAll(); } } }
結果。
## 実行 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.distexec.SingleMemberCallableTaskExecutor ## クラスタ Members {size:3, ver:7} [ Member [172.22.0.1]:5702 - 2b49f301-3dd2-4b6a-b7a8-5e955006e938 Member [172.22.0.1]:5701 - d769e220-53b1-45b7-889a-9936a53ddb60 Member [172.22.0.1]:5703 - b24c4140-f2b6-4bbe-96ab-66a088b03f0c this ] ## 結果 [2017-12-08T23:34:28.163] Hello from 2b49f301-3dd2-4b6a-b7a8-5e955006e938!!
結果は、2番目に起動したNodeから返ってきたようです。
もっとバリエーション
もうちょっといろいろ試してみましょう。
今度は、RunnableでもHazelcastInstanceAwareインターフェースを実装したクラスを作成してみます。
src/main/java/org/littlewings/hazelcast/distexec/HelloRunnableTask.java
package org.littlewings.hazelcast.distexec; import java.io.Serializable; import java.time.LocalDateTime; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; public class HelloRunnableTask implements Runnable, HazelcastInstanceAware, Serializable { transient HazelcastInstance hazelcast; @Override public void setHazelcastInstance(HazelcastInstance hazelcast) { this.hazelcast = hazelcast; } @Override public void run() { System.out.printf("[%s] Hello from %s%n", LocalDateTime.now(), hazelcast.getCluster().getLocalMember().getUuid()); } }
こちらを、あるキーのオーナーであるMember上で実行。
public class OnKeyOwnerTaskExecutor { public static void main(String... args) throws ExecutionException, InterruptedException { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); try { String key = "key1"; IScheduledExecutorService es = hazelcast.getScheduledExecutorService("default"); IScheduledFuture<?> future = es.scheduleOnKeyOwner(new HelloRunnableTask(), key, 5, TimeUnit.SECONDS); future.get(); } finally { hazelcast.shutdown(); Hazelcast.shutdownAll(); } } }
実行。
## 実行 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.distexec.OnKeyOwnerTaskExecutor ## クラスタ Members {size:3, ver:9} [ Member [172.22.0.1]:5702 - 2b49f301-3dd2-4b6a-b7a8-5e955006e938 Member [172.22.0.1]:5701 - d769e220-53b1-45b7-889a-9936a53ddb60 Member [172.22.0.1]:5703 - 53afbfbc-0c4d-4a3d-9ce6-a67bad9f8e1d this ] ## 結果(自Node) [2017-12-08T23:37:24.953] Hello from 53afbfbc-0c4d-4a3d-9ce6-a67bad9f8e1d
今回は、自Nodeで動作しました…。
タスクの定義はこれくらいにして、他のバリエーションも。
クラスタ内の、複数のMember上でタスクを実行。
src/main/java/org/littlewings/hazelcast/distexec/ClusterMembersCallableTaskExecutor.java
package org.littlewings.hazelcast.distexec; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.Member; import com.hazelcast.scheduledexecutor.IScheduledExecutorService; import com.hazelcast.scheduledexecutor.IScheduledFuture; public class ClusterMembersCallableTaskExecutor { public static void main(String... args) throws ExecutionException, InterruptedException { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); try { Set<Member> members = new HashSet<>(hazelcast.getCluster().getMembers()); members.remove(hazelcast.getCluster().getLocalMember()); IScheduledExecutorService es = hazelcast.getScheduledExecutorService("default"); Map<Member, IScheduledFuture<String>> futures = es.scheduleOnMembers(new HelloCallableTask(), members, 5, TimeUnit.SECONDS); for (Map.Entry<Member, IScheduledFuture<String>> entry : futures.entrySet()) { System.out.printf("member[%s] from message = %s%n", entry.getKey().getUuid(), entry.getValue().get()); } System.out.printf("self = %s%n", hazelcast.getCluster().getLocalMember().getUuid()); } finally { hazelcast.shutdown(); Hazelcast.shutdownAll(); } } }
今回は、自Member以外のNodeで実行してみることにしました。
Set<Member> members = new HashSet<>(hazelcast.getCluster().getMembers());
members.remove(hazelcast.getCluster().getLocalMember());
IScheduledExecutorService#scheduleOnMembersで、指定したMember上で実行しますが、この場合はMemberをキー、IScheduledFutureを
値にしたMapが返ってきます。
IScheduledExecutorService es = hazelcast.getScheduledExecutorService("default"); Map<Member, IScheduledFuture<String>> futures = es.scheduleOnMembers(new HelloCallableTask(), members, 5, TimeUnit.SECONDS);
実行。
## 実行 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.distexec.ClusterMembersCallableTaskExecutor ## クラスタ Members {size:3, ver:11} [ Member [172.22.0.1]:5702 - 2b49f301-3dd2-4b6a-b7a8-5e955006e938 Member [172.22.0.1]:5701 - d769e220-53b1-45b7-889a-9936a53ddb60 Member [172.22.0.1]:5703 - 4a100710-4c28-4e6e-88c1-a1e1926d7f3a this ] ## 結果 member[d769e220-53b1-45b7-889a-9936a53ddb60] from message = [2017-12-08T23:41:50.435] Hello from d769e220-53b1-45b7-889a-9936a53ddb60!! member[2b49f301-3dd2-4b6a-b7a8-5e955006e938] from message = [2017-12-08T23:41:50.440] Hello from 2b49f301-3dd2-4b6a-b7a8-5e955006e938!! self = 4a100710-4c28-4e6e-88c1-a1e1926d7f3a
最後に出力しているのは自MemberのUUIDですが、それ以外のNodeから結果が返ってきました。
繰り返し実行
最後は、繰り返しタスクを実行してみます。
src/main/java/org/littlewings/hazelcast/distexec/AllMemberRepeatTaskExecutor.java
package org.littlewings.hazelcast.distexec; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.Member; import com.hazelcast.scheduledexecutor.IScheduledExecutorService; import com.hazelcast.scheduledexecutor.IScheduledFuture; public class AllMemberRepeatTaskExecutor { public static void main(String... args) throws ExecutionException, InterruptedException { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); try { IScheduledExecutorService es = hazelcast.getScheduledExecutorService("default"); Map<Member,IScheduledFuture<?>> futures = es.scheduleOnAllMembersAtFixedRate(new HelloRunnableTask(), 5, 5, TimeUnit.SECONDS); System.console().readLine("> Enter stop tasks."); for (Map.Entry<Member, IScheduledFuture<?>> entry : futures.entrySet()) { entry.getValue().cancel(false); } es.shutdown(); System.out.println("Task Finish!!"); } finally { hazelcast.shutdown(); Hazelcast.shutdownAll(); } } }
繰り返しタスクを実行するには、IScheduledExecutorService#schedule〜AtFixedRateを使用します。今回は、
scheduleOnAllMembersAtFixedRateを使用して全Member上で動かすことにしました。
Map<Member,IScheduledFuture<?>> futures = es.scheduleOnAllMembersAtFixedRate(new HelloRunnableTask(), 5, 5, TimeUnit.SECONDS);
この場合は、Runnableしか渡すことができません(java.util.concurrent.ScheduledExecutorServiceも同様)。
また、タスクを延々と実行し続けるので、Enterを押したら停止するようにしました。IScheduledFuture#cancelで
タスクをキャンセルできるのですが、cancelに渡す引数はfalseでなければなりません。
System.console().readLine("> Enter stop tasks."); for (Map.Entry<Member, IScheduledFuture<?>> entry : futures.entrySet()) { entry.getValue().cancel(false); } es.shutdown();
trueを渡すと、UnsupportedOperationExceptionが飛んできます。
https://github.com/hazelcast/hazelcast/blob/v3.9.1/hazelcast/src/main/java/com/hazelcast/scheduledexecutor/impl/ScheduledFutureProxy.java#L108-L113
では、実行。
## 実行 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.distexec.AllMemberRepeatTaskExecutor ## クラスタ Members {size:3, ver:13} [ Member [172.22.0.1]:5702 - 2b49f301-3dd2-4b6a-b7a8-5e955006e938 Member [172.22.0.1]:5701 - d769e220-53b1-45b7-889a-9936a53ddb60 Member [172.22.0.1]:5703 - f4d079d4-5427-4080-a01f-7129a7ca36b3 this ] ## 結果 ### Node 1 [2017-12-08T23:48:21.983] Hello from d769e220-53b1-45b7-889a-9936a53ddb60 [2017-12-08T23:48:26.983] Hello from d769e220-53b1-45b7-889a-9936a53ddb60 [2017-12-08T23:48:31.983] Hello from d769e220-53b1-45b7-889a-9936a53ddb60 [2017-12-08T23:48:36.983] Hello from d769e220-53b1-45b7-889a-9936a53ddb60 [2017-12-08T23:48:41.983] Hello from d769e220-53b1-45b7-889a-9936a53ddb60 ### Node 2 [2017-12-08T23:48:21.978] Hello from 2b49f301-3dd2-4b6a-b7a8-5e955006e938 [2017-12-08T23:48:26.978] Hello from 2b49f301-3dd2-4b6a-b7a8-5e955006e938 [2017-12-08T23:48:31.978] Hello from 2b49f301-3dd2-4b6a-b7a8-5e955006e938 [2017-12-08T23:48:36.978] Hello from 2b49f301-3dd2-4b6a-b7a8-5e955006e938 [2017-12-08T23:48:41.978] Hello from 2b49f301-3dd2-4b6a-b7a8-5e955006e938 ### 自Node [2017-12-08T23:48:22.014] Hello from f4d079d4-5427-4080-a01f-7129a7ca36b3 [2017-12-08T23:48:26.985] Hello from f4d079d4-5427-4080-a01f-7129a7ca36b3 [2017-12-08T23:48:31.985] Hello from f4d079d4-5427-4080-a01f-7129a7ca36b3 [2017-12-08T23:48:36.985] Hello from f4d079d4-5427-4080-a01f-7129a7ca36b3 [2017-12-08T23:48:41.985] Hello from f4d079d4-5427-4080-a01f-7129a7ca36b3
適当なところでEnterを押すと、すべてのIScheduledFutureをキャンセルするので、タスクが終了します。
ここで、キャンセルせずに起動元を止めても、タスクが他Nodeでそのまま走り続けます…。
とりあえず、基本的なバリエーションは確認してみたというところですね。
設定
Scheduled Executor Serviceは、いくつか設定項目があります。
Configuring Scheduled Executor Service
設定可能なのは
- pool-size … Member単位のExecutorが持つスレッド数。デフォルト16
- durability … Executorの耐久度(Nodeダウンに備えて、タスクをどれだけバックアップするか)。デフォルト1
- capacity … Partitionごとにタスクを保持できる最大数。これを越えてタスクを追加しようとすると、RejectedExecutionExceptionが発生する。デフォルト100
の3つです。
とりあえず、設定してみましょう。
src/main/resources/hazelcast.xml
<?xml version="1.0" encoding="UTF-8"?> <hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.9.xsd" xmlns="http://www.hazelcast.com/schema/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <scheduled-executor-service name="configuredScheduledExecSvc"> <pool-size>32</pool-size> <durability>2</durability> <capacity>200</capacity> </scheduled-executor-service> <scheduled-executor-service name="defaultValuedScheduledExecSvc"> <pool-size>16</pool-size> <durability>1</durability> <capacity>100</capacity> </scheduled-executor-service> </hazelcast>
2つ目の設定は、デフォルト値を明示的に設定したものです。
ちょっとデフォルト値と、設定した値を表示してみましょう。
src/main/java/org/littlewings/hazelcast/distexec/ConfiguredExecutorServiceRunner.java
package org.littlewings.hazelcast.distexec; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.hazelcast.config.ScheduledExecutorConfig; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.scheduledexecutor.IScheduledExecutorService; import com.hazelcast.scheduledexecutor.IScheduledFuture; public class ConfiguredExecutorServiceRunner { public static void main(String... args) throws ExecutionException, InterruptedException { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); try { ScheduledExecutorConfig defaultConfig = hazelcast.getConfig().getScheduledExecutorConfig("default"); System.out.printf("default pool-size = %d%n", defaultConfig.getPoolSize()); System.out.printf("default durability = %d%n", defaultConfig.getDurability()); System.out.printf("default capacity = %d%n", defaultConfig.getCapacity()); ScheduledExecutorConfig customConfig = hazelcast.getConfig().getScheduledExecutorConfig("configuredScheduledExecSvc"); System.out.printf("custom pool-size = %d%n", customConfig.getPoolSize()); System.out.printf("custom durability = %d%n", customConfig.getDurability()); System.out.printf("custom capacity = %d%n", customConfig.getCapacity()); IScheduledExecutorService es = hazelcast.getScheduledExecutorService("configuredScheduledExecSvc"); IScheduledFuture<?> future = es.schedule(new HelloRunnableTask(), 0, TimeUnit.SECONDS); future.get(); } finally { hazelcast.shutdown(); Hazelcast.shutdownAll(); } } }
結果。
## 実行 mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.distexec.ConfiguredExecutorServiceRunner ## 結果 default pool-size = 16 default durability = 1 default capacity = 100 custom pool-size = 32 custom durability = 2 custom capacity = 200
ちゃんと、設定されていますよ、と。
もう少し深堀り
ところで、このScheduled Executor Serviceってどうやって実現されているんでしょう?
内部的には、タスクが依頼された各Nodeで、ローカルなjava.util.concurrent.ScheduledExecutorServiceを動かしています。
https://github.com/hazelcast/hazelcast/blob/v3.9.1/hazelcast/src/main/java/com/hazelcast/util/executor/LoggingScheduledExecutor.java#L51
これらのScheduledExecutorServiceを作成しているのは、このあたりですね。
https://github.com/hazelcast/hazelcast/blob/v3.9.1/hazelcast/src/main/java/com/hazelcast/spi/impl/executionservice/impl/ExecutionServiceImpl.java#L141-L150
また、単にscheduleメソッドを呼び出した時にタスクを実行するMemberは、どのように決まるのでしょう?
この場合は自前でUUIDを生成して、それをキーにして動作させるPartitionId(=Member)を決定します。
https://github.com/hazelcast/hazelcast/blob/v3.9.1/hazelcast/src/main/java/com/hazelcast/scheduledexecutor/impl/ScheduledExecutorServiceProxy.java#L417-L424
https://github.com/hazelcast/hazelcast/blob/v3.9.1/hazelcast/src/main/java/com/hazelcast/scheduledexecutor/impl/ScheduledExecutorServiceProxy.java#L104-L105
https://github.com/hazelcast/hazelcast/blob/v3.9.1/hazelcast/src/main/java/com/hazelcast/scheduledexecutor/impl/ScheduledExecutorServiceProxy.java#L391-L393
要するに、scheduleOnKeyOwnerと比べると明示的にキーを指定するか、Hazelcastに勝手にUUIDを振ってもらうかの差、ということですね。
まとめ
Hazelcast 3.8で追加された、Scheduled Executor Serviceを試してみました。
java.util.concurrent.ScheduledExecutorServiceをHazelcast上で実行できるようにしたものですが、知っていると役に立つ…かも?
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-scheduled-distexec