JCacheには、Cacheに対する各種操作(put/removeなど)、そして有効期限切れ時のイベントに対して反応するListenerを設定することができます。
Listenerを作成するには、通知を受けたいイベントに応じたCacheEntryListenerのサブインターフェースを実装したクラスを作成し、Cache作成時、もしくはCache作成後に設定します。
それでは、順に見ていってみます。
イベントとCacheEntryListenerのサブインターフェース
通知を受けたいイベントに応じたCacheEntryListenerのサブインターフェースを実装する必要があると書きましたが、具体的には以下のようなバリエーションになります。
イベントの種類 | 意味 | 対応するCacheEntryListenerのサブインターフェース | 実装するメソッド |
---|---|---|---|
CREATED | Cacheエントリが作成された時 | CacheEntryCreatedListener | onCreated |
EXPIRED | Cacheエントリが有効期限切れした時 | CacheEntryExpiredListener | onExpired |
REMOVED | Cacheエントリが削除された時 | CacheEntryRemovedListener | onRemoved |
UPDATED | Cacheエントリが更新された時 | CacheEntryUpdatedListener | onUpdated |
Cacheに対するどの操作(メソッド呼び出し)に対して、どのイベントが発生するのかということですが、正確にはJSR-107の「8.4. Invocation of Listeners」を確認いただくとよいのですが…そう直感に反するものはないと思います。
ただ、EXPIREDだけは例外で、どの操作にも紐付けられていません。EXPIREDイベントが発生するタイミングは、実装依存ということになっています。
その他、CacheEntryListenerに対する注意事項は、以下の通りです。
※JSR-107から
- CacheEntryListenerはSerializableであった方がよい(特に、CacheEntryListenerのインスタンスを直接設定する場合は必須)
- イベントは、Cacheの操作後に発生する
- 同期モードでは、イベントの発生順に通知され、Listenerが制御を戻すまでは呼び出したスレッドはブロックされる
- 非同期モードでは、イベントの発生順に通知されることは保障されないが、同じキーに対するイベントは順番に処理されなければならない
- Listenerは複数登録可能だが、呼び出し順は定められていない
- Listenerが例外を投げると、Cacheへの操作が失敗する
- ListenerはCacheを変更してはならない。デッドロックが発生する可能性があり、デッドロックを検知できるかどうかは実装依存となる
- 登録されたListenerに対して、イベントは最大1回呼び出される(クラスタ環境でブロードキャストされない)
- Listenerは、イベントをin-processで処理することは必須ではない
- 分散環境では、Listenerはどこでも実行されうる
とまあ、こんな感じ。裏を取っていないものもいくつかありますが、自分が触った範囲だと非同期モードというのは…??
※例外スロー時は、ちょっと気になる…
とりあえず、試してみましょう。
準備
今回のJCacheの実装は、Reference Implementationを使用します。Maven依存関係は、以下の通り。
<dependency> <groupId>javax.cache</groupId> <artifactId>cache-api</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.jsr107.ri</groupId> <artifactId>cache-ri-impl</artifactId> <version>1.0.0</version> </dependency>
また、JUnitとAssertJを動作確認に使用しました。大したことはしていませんが。
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.1.0</version> <scope>test</scope> </dependency>
CacheEntryListenerを実装する
それでは、CacheEntryListenerを作成してみます。4種類のイベントと、それに対するインターフェースがあることを紹介しましたが、今回はすべてのインターフェースを実装してみます。
つまり、こうなりました。
src/main/java/org/littlewings/jcache/listener/MyCacheListener.java
package org.littlewings.jcache.listener; import java.io.Serializable; import javax.cache.event.CacheEntryCreatedListener; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryExpiredListener; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryRemovedListener; import javax.cache.event.CacheEntryUpdatedListener; public class MyCacheListener implements CacheEntryCreatedListener<String, String>, CacheEntryExpiredListener<String, String>, CacheEntryRemovedListener<String, String>, CacheEntryUpdatedListener<String, String>, Serializable { @Override public void onCreated(Iterable<CacheEntryEvent<? extends String, ? extends String>> cacheEntryEvents) throws CacheEntryListenerException { cacheEntryEvents .forEach(event -> System.out.printf("[%s] onCreated Event at[%s], EventType[%s], KeyValue = %s:%s, OldValue = %s%n", Thread.currentThread().getName(), getClass().getSimpleName(), event.getEventType(), event.getKey(), event.getValue(), event.isOldValueAvailable() ? event.getOldValue() : null ) ); } @Override public void onExpired(Iterable<CacheEntryEvent<? extends String, ? extends String>> cacheEntryEvents) throws CacheEntryListenerException { cacheEntryEvents .forEach(event -> System.out.printf("[%s] onExpired Event at[%s], EventType[%s], KeyValue = %s:%s, OldValue = %s%n", Thread.currentThread().getName(), getClass().getSimpleName(), event.getEventType(), event.getKey(), event.getValue(), event.isOldValueAvailable() ? event.getOldValue() : null ) ); } @Override public void onRemoved(Iterable<CacheEntryEvent<? extends String, ? extends String>> cacheEntryEvents) throws CacheEntryListenerException { cacheEntryEvents .forEach(event -> System.out.printf("[%s] onRemoved Event at[%s], EventType[%s], KeyValue = %s:%s, OldValue = %s%n", Thread.currentThread().getName(), getClass().getSimpleName(), event.getEventType(), event.getKey(), event.getValue(), event.isOldValueAvailable() ? event.getOldValue() : null ) ); } @Override public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends String>> cacheEntryEvents) throws CacheEntryListenerException { cacheEntryEvents .forEach(event -> System.out.printf("[%s] onUpdated Event at[%s], EventType[%s], KeyValue = %s:%s, OldValue = %s%n", Thread.currentThread().getName(), getClass().getSimpleName(), event.getEventType(), event.getKey(), event.getValue(), event.isOldValueAvailable() ? event.getOldValue() : null ) ); } }
イベントは、Iterable<CacheEntryEvent>という形で受け取るようです。
Cache作成時にCacheEntryListenerを登録する
まずは、オーソドックスにCache作成時にListenerを登録してみます。
※以降のテストコードには、以下のimport文が定義されているものとします。
import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.Caching; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Configuration; import javax.cache.configuration.FactoryBuilder; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; import javax.cache.configuration.MutableConfiguration; import javax.cache.expiry.Duration; import javax.cache.expiry.TouchedExpiryPolicy; import javax.cache.spi.CachingProvider; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat;
こんなコードになりました。
@Test public void testSimpleCacheEntryListener() { CacheEntryListenerConfiguration<String, String> listenerConfiguration = new MutableCacheEntryListenerConfiguration<>( FactoryBuilder.factoryOf(MyCacheListener.class), // CacheEntryListener null, // CacheEntryEventFilter true, // isOldValueRequired true // isSynchronous ); Configuration<String, String> configuration = new MutableConfiguration<String, String>() .setTypes(String.class, String.class) .addCacheEntryListenerConfiguration(listenerConfiguration) .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 3))); try (CachingProvider provider = Caching.getCachingProvider(); CacheManager manager = provider.getCacheManager(); Cache<String, String> cache = manager.createCache("listenerCache", configuration)) { IntStream .rangeClosed(1, 5) .forEach(i -> cache.put("key" + i, "value" + i)); // Created Event assertThat(cache.get("key1")) .isEqualTo("value1"); // 特にイベントなし(CacheLoaderがいない場合) cache.remove("key2"); // Removed Event cache.put("key3", "new-value3"); // Updated Event try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { // ignore } IntStream .rangeClosed(1, 5) .forEach(i -> cache.get("key" + i)); // Expired Event(RIの場合) } }
まずは、CacheEntryListenerConfigurationのインスタンスを作成します。ここで、先ほど作成したCacheEntryListenrを設定します。
CacheEntryListenerConfiguration<String, String> listenerConfiguration = new MutableCacheEntryListenerConfiguration<>( FactoryBuilder.factoryOf(MyCacheListener.class), // CacheEntryListener null, // CacheEntryEventFilter true, // isOldValueRequired true // isSynchronous );
CacheEntryListenerの設定方法は、FactoryBuilder経由で行いますが
- 作成したCacheEntryListenerのClassクラス
- 作成したCacheEntryListenerのクラス名
- 作成したCacheEntryListenerのインスタンス(要Serializable)
のいずれかの方法で指定します。
その他の引数は、CacheEntryEventFilter(後述)、古い値を取得可能にするか、同期モードか、となります。このうち、同期モードはどうも効かなかったような…?あと、古い値を取れるかどうかも割と実装依存な…?
それを踏まえて、Listener側の実装はこうなりました、と。
System.out.printf("[%s] onUpdated Event at[%s], EventType[%s], KeyValue = %s:%s, OldValue = %s%n", Thread.currentThread().getName(), getClass().getSimpleName(), event.getEventType(), event.getKey(), event.getValue(), event.isOldValueAvailable() ? event.getOldValue() : null )
※スレッド名を出力しましたけど、あんまり意味なかった…。
OldValueが取得可能かどうかは、事前に確認しています。CacheEntryEvent#isOldValueAvailableがfalseを返す状態でムリにOldValueを取得しようとすると、UnsupportedOperationExceptionがスローされます。
続いて、ConfigurationにこのCacheEntryListenerConfigurationのインスタンスを設定し、Cacheを作成します。
Configuration<String, String> configuration = new MutableConfiguration<String, String>() .setTypes(String.class, String.class) .addCacheEntryListenerConfiguration(listenerConfiguration) .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 3)));
あとは、Cacheに対する操作を行うだけですね。
CREATED。
IntStream .rangeClosed(1, 5) .forEach(i -> cache.put("key" + i, "value" + i)); // Created Event
この時の出力結果。
[main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key1:value1, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key2:value2, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key3:value3, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key4:value4, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key5:value5, OldValue = null
getは何も起こりません。
assertThat(cache.get("key1")) .isEqualTo("value1"); // 特にイベントなし(CacheLoaderがいない場合)
CacheLoaderがいない場合は、ですが。
REMOVED。
cache.remove("key2"); // Removed Event
出力結果。
[main] onRemoved Event at[MyCacheListener], EventType[REMOVED], KeyValue = key2:value2, OldValue = null
UPDATED。
cache.put("key3", "new-value3"); // Updated Event
出力結果。
[main] onUpdated Event at[MyCacheListener], EventType[UPDATED], KeyValue = key3:new-value3, OldValue = value3
EXPIRED。
※RIの場合は、Cacheへの操作時に発生するようです
IntStream .rangeClosed(1, 5) .forEach(i -> cache.get("key" + i)); // Expired Event(RIの場合)
出力結果。
[main] onExpired Event at[MyCacheListener], EventType[EXPIRED], KeyValue = key1:value1, OldValue = null [main] onExpired Event at[MyCacheListener], EventType[EXPIRED], KeyValue = key3:new-value3, OldValue = null [main] onExpired Event at[MyCacheListener], EventType[EXPIRED], KeyValue = key4:value4, OldValue = null [main] onExpired Event at[MyCacheListener], EventType[EXPIRED], KeyValue = key5:value5, OldValue = null
まあ、こんな感じではないでしょうか?
CacheEntryListenerを複数設定する
先にも書きましたが、CacheEntryListenerは複数設定することができます。例えば、最初に作ったListenerを継承して、もうひとつListenerを作成してみます。
public class MyCacheListener2 extends MyCacheListener { }
この2つのListenerを使ったコードは、こちら。
@Test public void testSimpleCacheEntryListenerMultiple() { CacheEntryListenerConfiguration<String, String> listenerConfiguration1 = new MutableCacheEntryListenerConfiguration<>( FactoryBuilder.factoryOf(MyCacheListener.class), // CacheEntryListener null, // CacheEntryEventFilter true, // isOldValueRequired true // isSynchronous ); CacheEntryListenerConfiguration<String, String> listenerConfiguration2 = new MutableCacheEntryListenerConfiguration<>( FactoryBuilder.factoryOf(MyCacheListener2.class), // CacheEntryListener null, // CacheEntryEventFilter true, // isOldValueRequired true // isSynchronous ); Configuration<String, String> configuration = new MutableConfiguration<String, String>() .setTypes(String.class, String.class) .addCacheEntryListenerConfiguration(listenerConfiguration1) .addCacheEntryListenerConfiguration(listenerConfiguration2) .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 3))); try (CachingProvider provider = Caching.getCachingProvider(); CacheManager manager = provider.getCacheManager(); Cache<String, String> cache = manager.createCache("listenerCache", configuration)) { IntStream .rangeClosed(1, 5) .forEach(i -> cache.put("key" + i, "value" + i)); // Created Event assertThat(cache.get("key1")) .isEqualTo("value1"); // 特にイベントなし(CacheLoaderがいない場合) cache.remove("key2"); // Removed Event cache.put("key3", "new-value3"); // Updated Event try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { // ignore } IntStream .rangeClosed(1, 5) .forEach(i -> cache.get("key" + i)); // Expired Event(RIの場合) } }
それぞれにCacheEntryListenerConfigurationのインスタンスを作成して、Configurationに設定します。
Configuration<String, String> configuration = new MutableConfiguration<String, String>() .setTypes(String.class, String.class) .addCacheEntryListenerConfiguration(listenerConfiguration1) .addCacheEntryListenerConfiguration(listenerConfiguration2) .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 3)));
あとは、普通に動かすだけ…出力結果は、この通り。
[main] onCreated Event at[MyCacheListener2], EventType[CREATED], KeyValue = key1:value1, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key1:value1, OldValue = null [main] onCreated Event at[MyCacheListener2], EventType[CREATED], KeyValue = key2:value2, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key2:value2, OldValue = null [main] onCreated Event at[MyCacheListener2], EventType[CREATED], KeyValue = key3:value3, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key3:value3, OldValue = null [main] onCreated Event at[MyCacheListener2], EventType[CREATED], KeyValue = key4:value4, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key4:value4, OldValue = null [main] onCreated Event at[MyCacheListener2], EventType[CREATED], KeyValue = key5:value5, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key5:value5, OldValue = null [main] onRemoved Event at[MyCacheListener2], EventType[REMOVED], KeyValue = key2:value2, OldValue = null [main] onRemoved Event at[MyCacheListener], EventType[REMOVED], KeyValue = key2:value2, OldValue = null [main] onUpdated Event at[MyCacheListener2], EventType[UPDATED], KeyValue = key3:new-value3, OldValue = value3 [main] onUpdated Event at[MyCacheListener], EventType[UPDATED], KeyValue = key3:new-value3, OldValue = value3
まあ、2倍になりまして。
CacheにListenerを動的に追加する
ここまでは、Cache作成時にListenerを設定していましたが、今度はCacheに動的にListenerを追加してみます。こんなコードになりました。
@Test public void testListenerDynamically() { Configuration<String, String> configuration = new MutableConfiguration<String, String>() .setTypes(String.class, String.class) .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 3))); try (CachingProvider provider = Caching.getCachingProvider(); CacheManager manager = provider.getCacheManager(); Cache<String, String> cache = manager.createCache("listenerCache", configuration)) { IntStream .rangeClosed(1, 5) .forEach(i -> cache.put("key" + i, "value" + i)); // Created Event(このイベントは出力されない) CacheEntryListenerConfiguration<String, String> listenerConfiguration = new MutableCacheEntryListenerConfiguration<>( FactoryBuilder.factoryOf(MyCacheListener.class), // CacheEntryListener null, // CacheEntryEventFilter true, // isOldValueRequired true // isSynchronous ); // Listener登録 cache.registerCacheEntryListener(listenerConfiguration); cache.remove("key2"); // Removed Event(このイベントは出力される) cache.put("key3", "new-value3"); // Updated Event(このイベントは出力される) // Listener削除 cache.deregisterCacheEntryListener(listenerConfiguration); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { // ignore } IntStream .rangeClosed(1, 5) .forEach(i -> cache.get("key" + i)); // Expired Event(RIの場合)(このイベントは出力されない) } }
CacheEntryListenerConfigurationのインスタンスは作成するのですが、
CacheEntryListenerConfiguration<String, String> listenerConfiguration = new MutableCacheEntryListenerConfiguration<>( FactoryBuilder.factoryOf(MyCacheListener.class), // CacheEntryListener null, // CacheEntryEventFilter true, // isOldValueRequired true // isSynchronous );
これをCache#registerCacheEntryListenerで登録します。
// Listener登録
cache.registerCacheEntryListener(listenerConfiguration);
また、削除するにはCache#deregisterCacheEntryListenerを使用します。
// Listener削除
cache.deregisterCacheEntryListener(listenerConfiguration);
今回は、先ほどまでのコード例で、removeとput(UPDATED)の間だけ有効にしたので
// Listener登録 cache.registerCacheEntryListener(listenerConfiguration); cache.remove("key2"); // Removed Event(このイベントは出力される) cache.put("key3", "new-value3"); // Updated Event(このイベントは出力される) // Listener削除 cache.deregisterCacheEntryListener(listenerConfiguration);
このような出力結果になりました。
[main] onRemoved Event at[MyCacheListener], EventType[REMOVED], KeyValue = key2:value2, OldValue = null [main] onUpdated Event at[MyCacheListener], EventType[UPDATED], KeyValue = key3:new-value3, OldValue = value3
CacheEntryEventFilterで対象のイベントを絞り込む
最後は、CacheEntryEventFilterを使ってみたいと思います。CacheEntryEventFilterインターフェースの実装クラスを作成してCacheEntryListenerConfigurationに設定することで、Listenerに通知するイベントを絞り込むことができます。
今回は、CREATEDとREMOVEDを対象にしたCacheEntryEventFilterを実装してみました。
src/main/java/org/littlewings/jcache/listener/MyCacheEventFilter.java
package org.littlewings.jcache.listener; import java.io.Serializable; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.EventType; public class MyCacheEventFilter implements CacheEntryEventFilter<String, String>, Serializable { @Override public boolean evaluate(CacheEntryEvent<? extends String, ? extends String> event) throws CacheEntryListenerException { return event.getEventType() == EventType.CREATED || event.getEventType() == EventType.REMOVED; } }
これを、CacheEntryListenerConfigurationに設定します。
CacheEntryListenerConfiguration<String, String> listenerConfiguration = new MutableCacheEntryListenerConfiguration<>( FactoryBuilder.factoryOf(MyCacheListener.class), // CacheEntryListener FactoryBuilder.factoryOf(MyCacheEventFilter.class), // CacheEntryEventFilter true, // isOldValueRequired true // isSynchronous );
そのままConfigurationに適用して
Configuration<String, String> configuration = new MutableConfiguration<String, String>() .setTypes(String.class, String.class) .addCacheEntryListenerConfiguration(listenerConfiguration) .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 3)));
あとは、普通にCacheを使います。
try (CachingProvider provider = Caching.getCachingProvider(); CacheManager manager = provider.getCacheManager(); Cache<String, String> cache = manager.createCache("listenerCache", configuration)) { IntStream .rangeClosed(1, 5) .forEach(i -> cache.put("key" + i, "value" + i)); // Created Event assertThat(cache.get("key1")) .isEqualTo("value1"); // 特にイベントなし(CacheLoaderがいない場合) cache.remove("key2"); // Removed Event cache.put("key3", "new-value3"); // Updated Event try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { // ignore } IntStream .rangeClosed(1, 5) .forEach(i -> cache.get("key" + i)); // Expired Event(RIの場合) } }
もともと用意していたListenerは、CREATED、EXPIRED、REMOVED、UPDATEDのいずれのイベントも受け取るようなListenerでしたが、今回作成したCacheEntryEventFilterを適用することで対象のイベントが絞られます。
[main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key1:value1, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key2:value2, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key3:value3, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key4:value4, OldValue = null [main] onCreated Event at[MyCacheListener], EventType[CREATED], KeyValue = key5:value5, OldValue = null [main] onRemoved Event at[MyCacheListener], EventType[REMOVED], KeyValue = key2:value2, OldValue = null
CREATEDとREMOVEDだけになりましたね。
まとめ
Cacheの操作に対してイベントを受け取ることができる、CacheEntryListenerおよび、Listenerに通知するイベントを絞り込むことができるCacheEntryEventFilterを紹介しました。
なんか使っていると挙動に実装差異がありそうな気がするのですが、OldValueとSynchronousを気にしなければよさそうな感じが…?
イベント発生の対応表は、Expireなどと合わせてそのうち整理したいですね。