ちょっと思うところがありまして、HazelcastのEntry Processorを少し詳しく見ていこうかなと思います。
なお、HazelcastのEntry Processor自体は前に試したことがあるのですが、今回はもっと内部動作やData Affinity的な観点も入れつつ。
HazelcastのEntry Processorを使う - CLOVER
Entry Processor?
自分に向けてのおさらい的に。
Hazelcastには、Entry Processorと呼ばれるものが2つあります。ひとつはDistributed Mapに対するもので、もうひとつはJCacheの実装として
使う時のものです。
Distributed Map / Entry Processor
JCache / Implementing EntryProcessor
今回は、Distributed Mapの方を対象にします(Distributed MapとJCacheでは、実装も違うので)。
Distributed MapにおけるEntry Proessorは、EntryProcessorインターフェースを実装したクラスを作成し、そのインスタンスをIMap#各種executeメソッドに
渡すことによって動作します。
EntryProcessor (Hazelcast Root 3.9.2 API)
IMap (Hazelcast Root 3.9.2 API)
Entry Processorを使うことによる利点は、通常Distributed Mapに対してエントリをgetしてputするような複合的な操作をしたりする場合に、この処理を実行した
Nodeがデータのオーナーでなかった場合はネットワークアクセスが発生しますが、EntryProcessorを使用するとデータのオーナーであるNode上で
処理を実行してくれるため高速に動作することです。
また、ロックや並行性の問題を気にすることなく実行できるという利点もあります。HazelcastのEntryProcessorはPartitionThread上で実行されるため、
明示的なロックは必要ないというスタンスになっています。
この点については、また後で書きましょう。
要するに、EntryProcessorと呼ばれる、処理を実装したオブジェクトをデータを持ったNodeに投げつけて実行することで、グリッド上で高速、安全に
処理ができるという代物です。
と、前置きはそんな感じにして書いていってみましょう。
準備
まずは、Maven依存関係から。
<dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>3.9.2</version> </dependency>
Hazelcast 3.9.2を使用します。
あとは、テストライブラリも。
<dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.0.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.9.0</version> <scope>test</scope> </dependency>
テストコードの雛形
テストコードを作成するにあたり、先に骨格となる雛形のクラスを書いておきます。
src/test/java/org/littlewings/hazelcast/entryprocessor/EntryProcessorTest.java
package org.littlewings.hazelcast.entryprocessor; import java.util.Arrays; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; import com.hazelcast.core.PartitionService; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; public class EntryProcessorTest { // ここに、テストを書く!! void withHazelcastInstance(int numInstances, Consumer<HazelcastInstance> fun) { List<HazelcastInstance> instances = IntStream .rangeClosed(1, numInstances) .mapToObj(i -> Hazelcast.newHazelcastInstance()) .collect(Collectors.toList()); try { fun.accept(instances.get(0)); } finally { instances.forEach(HazelcastInstance::shutdown); Hazelcast.shutdownAll(); } } }
Hazelcastのクラスタを簡単に構成できるヘルパーメソッド付きで、以降のコードはこの中に書いていきたいと思います。
EntryProcessorがデータのあるNodeで動作していることを確認する
最初は、EntryProcessorが本当にデータのあるNodeで実行されているか確認してみましょう。
作成したのは、こんなコード。
@Test public void confirmLocationRunEntryProcessor() { withHazelcastInstance(3, hazelcast -> { IMap<String, String> map = hazelcast.getMap("default"); IntStream.rangeClosed(1, 10).forEach(i -> map.put("key" + i, "value" + i)); PartitionService ps = hazelcast.getPartitionService(); IntStream .rangeClosed(1, 10) .forEach(i -> { String key = "key" + i; System.out.printf("key = %s, location = %s%n", key, ps.getPartition(key).getOwner().getUuid()); assertThat( map.executeOnKey(key, new ReturnLocationEntryProcessor()) ).isEqualTo(ps.getPartition(key).getOwner().getUuid()); }); }); }
HazelcastのNodeを3つ起動して
withHazelcastInstance(3, hazelcast -> {
Distributed Mapにデータを放り込んでみます。
IMap<String, String> map = hazelcast.getMap("default"); IntStream.rangeClosed(1, 10).forEach(i -> map.put("key" + i, "value" + i));
データの配置を確認したいので、PartitionServiceを取得。
PartitionService ps = hazelcast.getPartitionService();
ここで、各キーに対してEntryProcessorを起動して、EntryProcessorが動作したNodeのUUIDを返してもらい、その値とPartitionServiceで取得した値が
同じかどうか比較してみましょう。
IntStream .rangeClosed(1, 10) .forEach(i -> { String key = "key" + i; System.out.printf("key = %s, location = %s%n", key, ps.getPartition(key).getOwner().getUuid()); assertThat( map.executeOnKey(key, new ReturnLocationEntryProcessor()) ).isEqualTo(ps.getPartition(key).getOwner().getUuid()); });
EntryProcessorを呼び出しているのは、この部分です。
map.executeOnKey(key, new ReturnLocationEntryProcessor())
あるキーに対して、EntryProcessorを起動しています。
また、この時のキーのオーナーをログ出力しておくようにします。
System.out.printf("key = %s, location = %s%n", key, ps.getPartition(key).getOwner().getUuid());
実装したEntryProcessorはこちら。EntryProcessorを作成するには、AbstractEntryProcessorクラスを継承して作成するのが便利です。
src/test/java/org/littlewings/hazelcast/entryprocessor/ReturnLocationEntryProcessor.java
package org.littlewings.hazelcast.entryprocessor; import java.util.Map; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.core.PartitionService; import com.hazelcast.map.AbstractEntryProcessor; import com.hazelcast.map.LockAware; public class ReturnLocationEntryProcessor extends AbstractEntryProcessor<String, String> implements HazelcastInstanceAware { transient HazelcastInstance hazelcast; @Override public void setHazelcastInstance(HazelcastInstance hazelcastInstance) { this.hazelcast = hazelcastInstance; } @Override public Object process(Map.Entry<String, String> entry) { PartitionService ps = hazelcast.getPartitionService(); System.out.printf( "key = %s, run entry processor member = %s, owner? = %b, locked? = %b%n", entry.getKey(), hazelcast.getCluster().getLocalMember().getUuid(), ps.getPartition(entry.getKey()).getOwner().getUuid().equals(hazelcast.getCluster().getLocalMember().getUuid()), ((LockAware) entry).isLocked() ); return hazelcast.getCluster().getLocalMember().getUuid(); } }
HazelcastInstanceAwareインターフェースを実装することで、EntryProcessorを起動したHazelcastInstanceをEntryProcessor側で扱うことができます。
public class ReturnLocationEntryProcessor extends AbstractEntryProcessor<String, String> implements HazelcastInstanceAware { transient HazelcastInstance hazelcast; @Override public void setHazelcastInstance(HazelcastInstance hazelcastInstance) { this.hazelcast = hazelcastInstance; }
あとは、Local NodeのUUIDを返却しておしまい。
@Override public Object process(Map.Entry<String, String> entry) { PartitionService ps = hazelcast.getPartitionService(); System.out.printf( "key = %s, run entry processor member = %s, owner? = %b, locked? = %b%n", entry.getKey(), hazelcast.getCluster().getLocalMember().getUuid(), ps.getPartition(entry.getKey()).getOwner().getUuid().equals(hazelcast.getCluster().getLocalMember().getUuid()), ((LockAware) entry).isLocked() ); return hazelcast.getCluster().getLocalMember().getUuid(); }
動作している時の情報をログ出力していますが、こちらは少し後で。
こちらを動かすと、クラスタが構成されて…
※なお、NodeのUUIDって言っているのは、ここで出力されているもののことです
2 06, 2018 12:24:45 午前 com.hazelcast.internal.cluster.ClusterService 情報: [172.23.0.1]:5701 [dev] [3.9.2] Members {size:3, ver:3} [ Member [172.23.0.1]:5701 - a9f7e828-4768-4661-85b9-fd0f9638ec3d this Member [172.23.0.1]:5702 - 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9 Member [172.23.0.1]:5703 - e282e189-3d53-4c59-bdf3-777529a8ae7b ]
ログを確認します。
key = key1, location = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9 key = key1, run entry processor member = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9, owner? = true, locked? = false key = key1, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = false, locked? = false key = key2, location = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9 key = key2, run entry processor member = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9, owner? = true, locked? = false key = key2, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = false, locked? = false key = key3, location = e282e189-3d53-4c59-bdf3-777529a8ae7b key = key3, run entry processor member = e282e189-3d53-4c59-bdf3-777529a8ae7b, owner? = true, locked? = false key = key3, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = false, locked? = false key = key4, location = a9f7e828-4768-4661-85b9-fd0f9638ec3d key = key4, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = true, locked? = false key = key4, run entry processor member = e282e189-3d53-4c59-bdf3-777529a8ae7b, owner? = false, locked? = false key = key5, location = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9 key = key5, run entry processor member = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9, owner? = true, locked? = false key = key5, run entry processor member = e282e189-3d53-4c59-bdf3-777529a8ae7b, owner? = false, locked? = false key = key6, location = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9 key = key6, run entry processor member = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9, owner? = true, locked? = false key = key6, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = false, locked? = false key = key7, location = e282e189-3d53-4c59-bdf3-777529a8ae7b key = key7, run entry processor member = e282e189-3d53-4c59-bdf3-777529a8ae7b, owner? = true, locked? = false key = key7, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = false, locked? = false key = key8, location = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9 key = key8, run entry processor member = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9, owner? = true, locked? = false key = key8, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = false, locked? = false key = key9, location = a9f7e828-4768-4661-85b9-fd0f9638ec3d key = key9, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = true, locked? = false key = key9, run entry processor member = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9, owner? = false, locked? = false key = key10, location = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9 key = key10, run entry processor member = 8d0a5a74-8e1d-4f44-b974-76fd7b0a49f9, owner? = true, locked? = false key = key10, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = false, locked? = false
テストはパスしているのでOK。「location = 〜」と書かれているログと、その次に出力されているログのUUIDが一致していますね?
なんですが、なんかEntryProcessorのログがもう1行多く出ています。これは、BackupEntryProcessorが動作しているからで、エントリのデータを
書き換えるような処理の場合はこのBackupEntryProcessorがBackup Nodeに対する処理を行うことになります。
Processing Backup Entries
よって、このBackupEntryProcessorはデータのオーナーではありません。ログをよくよく見ると、「owner? = 」の部分が「false」になっています。
key = key1, run entry processor member = a9f7e828-4768-4661-85b9-fd0f9638ec3d, owner? = false, locked? = false
なので、今回のようなデータを変更しない処理ではあまり関係がなかったりします…。というか、余計な処理が動くことになります。
AbstractEntryProcessorクラスを継承してEntryProcessorを作成すると、デフォルトでBackupEntryProcessorも実行するようになるのですが、これを止めたい
場合には、AbstractEntryProcessorのコンストラクタに「false」を渡せばOKです。
単純にEntryProcessorインターフェースを実装している場合は、getBackupProcessorメソッドでnullを返すようにオーバーライドすればよいでしょう。
この場合、BackupEntryProcessorは起動しなくなります。
あと、ログ出力時にロックの有無について確認しているのですが
((LockAware) entry).isLocked()
IMapインターフェースのJavadocに書いてあるように全エントリや複数のキーに対してEntryProcessorを起動するexecuteOnEntriesメソッドや
executeOnKeysメソッドについては、ロックに対応していません。
http://docs.hazelcast.org/docs/3.9.2/javadoc/com/hazelcast/core/IMap.html
単一のキーに対して実行するexecuteOnKeyメソッドやsubmitToKeyメソッドの場合は、ロックが取れるまで待機することになります。
複数のエントリ、キーに対してEntryProcessorを起動する場合は、自分で対象のエントリがロックされているかどうかを確認する必要があります。
それを確認するためのものがLockAwareインターフェースで、EntryProcessor#processに渡されるMap.EntryのインスタンスはLockAwareを
実装しているので、対象のエントリがロックされているかどうかはこのようにして確認することができます。
((LockAware) entry).isLocked()
まあ、今回はロックされていません、と。
とりあえず、EntryProcessorの実行場所については確認することができました。
Data Affinityと合わせて
続いて、EntryProcessorをData Affinityと合わせて確認してみましょう。
Data Affinityについては、こちら。
HazelcastのData Affinityを試してみる - CLOVER
Data Affinityを使用すると、関連するデータを同じNode、Partitionに集めることができます。こうすることで、関連するデータを扱う際にネットワーク
アクセスを減らすことができます。
まずは、配置の確認として、カテゴリと書籍というサンプルでこういう関係を考えてみます。
同じカテゴリに属する書籍は、同じNode、同じPartitionに配置されるように調整してみます。
カテゴリに対応するクラス。このクラスは、カテゴリ名をキーとしてDistributed Mapに登録する想定とします。
src/test/java/org/littlewings/hazelcast/entryprocessor/Category.java
package org.littlewings.hazelcast.entryprocessor; import java.io.Serializable; public class Category implements Serializable { private static final long serialVersionUID = 1L; String name; public static Category create(String name) { return new Category(name); } public Category(String name) { this.name = name; } public String getName() { return name; } }
続いて、書籍についてのクラス。Categoryへの参照を持ちますが、まあシリアライズ可能なふつうのクラスです。
src/test/java/org/littlewings/hazelcast/entryprocessor/Book.java
package org.littlewings.hazelcast.entryprocessor; import java.io.Serializable; public class Book implements Serializable { private static final long serialVersionUID = 1L; String isbn; String title; int price; Category category; public static Book create(String isbn, String title, int price, Category category) { return new Book(isbn, title, price, category); } public Book(String isbn, String title, int price, Category category) { this.isbn = isbn; this.title = title; this.price = price; this.category = category; } public String getIsbn() { return isbn; } public String getTitle() { return title; } public int getPrice() { return price; } public Category getCategory() { return category; } }
ここで、データの配置先をコントロールするために、Bookクラスのキーに対応するクラスを用意します。
src/test/java/org/littlewings/hazelcast/entryprocessor/BookKey.java
package org.littlewings.hazelcast.entryprocessor; import java.io.Serializable; import com.hazelcast.core.PartitionAware; public class BookKey implements PartitionAware<String>, Serializable { private static final long serialVersionUID = 1L; String isbn; Category category; public static BookKey create(String isbn, Category category) { return new BookKey(isbn, category); } public BookKey(String isbn, Category category) { this.isbn = isbn; this.category = category; } public String getIsbn() { return isbn; } public Category getCategory() { return category; } @Override public String getPartitionKey() { return category.getName(); } }
この用途のクラスは、PartitionAwareインターフェースを実装して
public class BookKey implements PartitionAware<String>, Serializable {
getPartitionKeyメソッドをオーバーライドする必要があります。
@Override public String getPartitionKey() { return category.getName(); }
PartitionAware#getPartitionKeyメソッドで返却する値は、配置先をコントロールするためのものとなります。今回は、Categoryクラスと同じ配置と
したいため、カテゴリの名前を返すように実装しています。
確認。
@Test public void dataAffinity() { Category springCategory = Category.create("spring"); Category javaeeCategory = Category.create("javaee"); Book[] springBooks = { Book.create("978-4798142470", "Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発", 4320, springCategory), Book.create("978-4774182179", "[改訂新版]Spring入門 ――Javaフレームワーク・より良い設計とアーキテクチャ", 4104, springCategory), Book.create("978-4777519699", "はじめてのSpring Boot―スプリング・フレームワークで簡単Javaアプリ開発", 2700, springCategory) }; Book[] javaeeBooks = { Book.create("978-4774183169", "パーフェクト Java EE", 3456, javaeeCategory), Book.create("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104, javaeeCategory), Book.create("978-4798124605", "Beginning Java EE 6〜GlassFish 3で始めるエンタープライズJava", 4536, javaeeCategory) }; withHazelcastInstance(3, hazelcast -> { IMap<String, Category> categoryMap = hazelcast.getMap("categories"); IMap<BookKey, Book> bookMap = hazelcast.getMap("books"); categoryMap.put(springCategory.getName(), springCategory); categoryMap.put(javaeeCategory.getName(), javaeeCategory); Arrays.stream(springBooks).forEach(book -> bookMap.put(BookKey.create(book.getIsbn(), book.getCategory()), book)); Arrays.stream(javaeeBooks).forEach(book -> bookMap.put(BookKey.create(book.getIsbn(), book.getCategory()), book)); PartitionService ps = hazelcast.getPartitionService(); assertThat( Arrays .stream(springBooks) .map(book -> ps.getPartition(BookKey.create(book.getIsbn(), book.getCategory())).getOwner().getUuid()) .collect(Collectors.toSet()) ) .hasSize(1) .containsOnly(ps.getPartition(springCategory.getName()).getOwner().getUuid()); assertThat( Arrays .stream(springBooks) .map(book -> ps.getPartition(BookKey.create(book.getIsbn(), book.getCategory())).getPartitionId()) .collect(Collectors.toSet()) ) .hasSize(1) .containsOnly(ps.getPartition(springCategory.getName()).getPartitionId()); assertThat( Arrays .stream(javaeeBooks) .map(book -> ps.getPartition(BookKey.create(book.getIsbn(), book.getCategory())).getOwner().getUuid()) .collect(Collectors.toSet()) ) .hasSize(1) .containsOnly(ps.getPartition(javaeeCategory.getName()).getOwner().getUuid()); assertThat( Arrays .stream(javaeeBooks) .map(book -> ps.getPartition(BookKey.create(book.getIsbn(), book.getCategory())).getPartitionId()) .collect(Collectors.toSet()) ) .hasSize(1) .containsOnly(ps.getPartition(javaeeCategory.getName()).getPartitionId()); }); }
2つのDistributed Mapを使用しています。
IMap<String, Category> categoryMap = hazelcast.getMap("categories"); IMap<BookKey, Book> bookMap = hazelcast.getMap("books");
データを登録して、データの配置先NodeのUUIDおよびPartitionのIDを確認していますが、いずれもカテゴリごとに同じ配置先となっていることが
確認できます。
これで、同じNode、Partitionにデータを集められる状態になりました。
EntryProcessorで、Data Affinityを使って配置したデータを扱う
では、ここまでで用意したCategoryクラス、BookKeyクラス、Bookクラスを使用して、EntryProcessorで同じカテゴリに属する書籍の価格の合計を
求めるようにしてみましょう。
こうすることで、同じカテゴリに属する書籍のデータにアクセスする際には、ネットワークアクセスが発生しなくなります、と。
その用途で作成した、EntryProcessorはこちら。
src/test/java/org/littlewings/hazelcast/entryprocessor/CalcPriceEntryProcessor.java
package org.littlewings.hazelcast.entryprocessor; import java.util.ArrayList; import java.util.List; import java.util.Map; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.map.AbstractEntryProcessor; public class CalcPriceEntryProcessor extends AbstractEntryProcessor<String, Integer> implements HazelcastInstanceAware { transient HazelcastInstance hazelcast; List<String> isbnList; public CalcPriceEntryProcessor(List<String> isbnList) { super(false); this.isbnList = new ArrayList<>(isbnList); } @Override public void setHazelcastInstance(HazelcastInstance hazelcastInstance) { this.hazelcast = hazelcastInstance; } @Override public Object process(Map.Entry<String, Integer> entry) { Category category = Category.create(entry.getKey()); Map<String, Book> bookMap = hazelcast.getMap("books"); return isbnList.stream().mapToInt(isbn -> bookMap.get(BookKey.create(isbn, category)).getPrice()).sum(); } }
EntryProcessorはカテゴリを中心に起動し、合算する書籍のISBNはEntryProcessorにあらかじめ教えておく形としました。
BackupEntryProcessorはオフにしています。
public CalcPriceEntryProcessor(List<String> isbnList) { super(false); this.isbnList = new ArrayList<>(isbnList); }
HazelcastInstanceAwareインターフェースも実装しているため、こちらで受け取ったHazelcastInstanceに対して、書籍用のDistributed Mapに
アクセスして対象の書籍データを取得、合算します。
@Override public Object process(Map.Entry<String, Integer> entry) { Category category = Category.create(entry.getKey()); Map<String, Book> bookMap = hazelcast.getMap("books"); return isbnList.stream().mapToInt(isbn -> bookMap.get(BookKey.create(isbn, category)).getPrice()).sum(); }
この時、カテゴリと同じNode、Partitionに書籍データが配置されているので、ネットワークアクセスが発生しないということになります。
確認のテストコードは、こちら。
@Test public void calcPrice() { Category springCategory = Category.create("spring"); Category javaeeCategory = Category.create("javaee"); Book[] springBooks = { Book.create("978-4798142470", "Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発", 4320, springCategory), Book.create("978-4774182179", "[改訂新版]Spring入門 ――Javaフレームワーク・より良い設計とアーキテクチャ", 4104, springCategory), Book.create("978-4777519699", "はじめてのSpring Boot―スプリング・フレームワークで簡単Javaアプリ開発", 2700, springCategory) }; Book[] javaeeBooks = { Book.create("978-4774183169", "パーフェクト Java EE", 3456, javaeeCategory), Book.create("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104, javaeeCategory), Book.create("978-4798124605", "Beginning Java EE 6〜GlassFish 3で始めるエンタープライズJava", 4536, javaeeCategory) }; withHazelcastInstance(3, hazelcast -> { IMap<String, Category> categoryMap = hazelcast.getMap("categories"); IMap<BookKey, Book> bookMap = hazelcast.getMap("books"); categoryMap.put(springCategory.getName(), springCategory); categoryMap.put(javaeeCategory.getName(), javaeeCategory); Arrays.stream(springBooks).forEach(book -> bookMap.put(BookKey.create(book.getIsbn(), book.getCategory()), book)); Arrays.stream(javaeeBooks).forEach(book -> bookMap.put(BookKey.create(book.getIsbn(), book.getCategory()), book)); assertThat( categoryMap.executeOnKey( springCategory.getName(), new CalcPriceEntryProcessor(Arrays.stream(springBooks).map(Book::getIsbn).collect(Collectors.toList())) ) ).isEqualTo(11124); assertThat( categoryMap.executeOnKey( javaeeCategory.getName(), new CalcPriceEntryProcessor(Arrays.stream(javaeeBooks).map(Book::getIsbn).collect(Collectors.toList())) ) ).isEqualTo(12096); }); }
まあ、これだと計算結果はわかっても、ネットワークアクセスが発生していないかどうかはわからないのですが…。
雰囲気、こんな感じですよ、と。
オマケ
EntryProcessorには、読み取りのみの操作向けのReadOnly、
それからEntryProcessorを特定のExecutorの元で実行する、Offloadableな機能もあるようです。
それぞれインターフェースを実装して使用するもののようですが、興味があれば…。
もうちょっと中身を追ってみる
ここまでではなんなので、もう少し中身を追ってみましょう。
EntryProcessorの実行は、やはりHazelcastのOperationの一種として実装されており、EntryProcessorOperationとその関連クラスという形で表現されています。
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EntryOperation.java#L172-L179
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EntryOperator.java#L162-L180
バックアップの方についても、Operationが存在します。
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EntryBackupOperation.java#L57-L61
どちらも、EntryOperator上でEntryProcessorが呼び出されます。
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EntryOperator.java#L329-L336
ところで、EntryProcessorはロックや並行性について考えなくていい、みたいなことが書いていましたね。こちらについても確認してみましょう。
HazelcastのOperationは、ローカル実行であってもリモート実行であっても実行されるNodeに管理されているスレッドのQueueに積まれ、実行されます。
Partitionに対する操作の場合は、それをPartitionThreadが受け持ちます。EntryProcessorは、PartitionThreadが受け持つ範囲となるためPartitionThreadの
Queueにタスクとして積み込まれます。
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationExecutorImpl.java#L375-L376
Queueからタスクを取り出してOperationを実行するのは、PartitionThreadなどの親クラスであるOperationThreadで実装されています。
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationThread.java#L110-L142
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/PartitionOperationThread.java
PartitionThreadはPartitionの数だけあり、このあたりから言えるのはPartitionひとつにつき同時に動作するOperationはひとつだということですね。
ロックを気にしなくてよいという点についてですが、Operationを実行する前に待つ必要があるかどうかを確認します。
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/OperationRunnerImpl.java#L190
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/OperationRunnerImpl.java#L255
実態としては、ロックが取れるかどうかを確認しています。
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EntryOperation.java#L405-L421
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/concurrent/lock/LockResourceImpl.java#L169-L171
これを行うのは、BlockingOperationインターフェースを実装しているクラスの場合であり、EntryOperationはBlockingOperationインタフェースを
実装しています。
http://docs.hazelcast.org/docs/3.9.2/javadoc/com/hazelcast/spi/BlockingOperation.html
まあ、この「待つべきかどうか?」を判定するshouldWaitメソッドの実装は、個々のBlockingOperationの実装に任されます、と。
ここで、「待つべきだ」と判定された場合には、キューに積まれて実行が先送りされます。
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/spi/impl/operationparker/impl/OperationParkerImpl.java#L111-L122
https://github.com/hazelcast/hazelcast/blob/v3.9.2/hazelcast/src/main/java/com/hazelcast/spi/impl/operationparker/impl/OperationParkerImpl.java#L118
この部分が、「EntryProcessorはロックを取れなければ、それまで待機する」と言っていた部分になります。
こういう理屈で、Partition内で動作するタスク数(Operation数)の関係と、EntryProcessorは裏であらかじめロックを取れるか確認することで
並行性を気にせず安全に実行できる仕組みになっている、というわけですね。
だいたい、仕組みはつかめたのではないでしょうか。
まとめ
HazelcastのEntry Processorについて、Data Affinityと絡めた確認や、内部の動きなどについて簡単に確認してみました。
内部動作を追うのはちょっと大変でしたが、理解が深まったのではないかなと思います。
今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-entry-processor-re