CLOVER🍀

That was when it all began.

Ehcacheで、Read Through的な

ちょっと、キャッシュ関連でとある人に振ったネタがあって、「Ehcacheだとどうやって実現するのかなぁ?」と思ったのがきっかけで、試してみることにしました。

やりたいことは、

  • キャッシュにアクセスした時に、キーに対応する値が存在しなければ何かしらのロジックでロードする
  • ロードした値は、キャッシュに格納する
  • それ以降は、キャッシュにロードされた値を使用する
  • キャッシュに登録後、有効期限が切れたり削除されたりしてなくなった後に、再度アクセスされたらもう1度ロードする

といった感じです。

普通にクライアントコード的な視点で考えると、
*コードのイメージは、Ehcacheです

Cache cache = ....;

if (cache.isKeyInCache(key)) {
    // キャッシュにすでに存在すれば、それを使う
    return cache.get(key).getObjectValue();
} else {
    // キャッシュになければどこから値を取得して、キャッシュに入れる
    Object value = ....;
    cache.put(new Element(key, value));
    return value;
}

みたいな感じで書くと思いますが、これが透過的にできると嬉しいケースもあるでしょう。

// get時に、ロードもやってしまう
Element elemnt = cache.get(key);

これを、Ehcacheでやってみます。

Maven依存関係

まずは、Ehcacheの依存関係。

    <dependency>
      <groupId>net.sf.ehcache</groupId>
      <artifactId>ehcache</artifactId>
      <version>2.7.4</version>
    </dependency>

あとは、テストコードに使ったのでJUnitとHamcrestを追加しておきました。

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-all</artifactId>
      <version>1.3</version>
      <scope>test</scope>
    </dependency>

SelfPopulatingCache

Ehcacheで、先ほどの動作を実現するには、SelfPopulatingCacheを使用するようです。

BlockingCache and SelfPopulatingCache
http://ehcache.org/documentation/apis/constructs

SelfPopulatingCacheを使用するには、まずはCacheEntryFactoryインターフェースを実装したクラスを用意します。
src/main/java/ehcache/cacheloader/blocking/SimpleCacheEntryFactory.java

package ehcache.cacheloader.blocking;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;

import net.sf.ehcache.CacheException;
import net.sf.ehcache.constructs.blocking.CacheEntryFactory;

public class SimpleCacheEntryFactory implements CacheEntryFactory {
    private AtomicInteger counter = new AtomicInteger();

    public int getLoadCount() {
        return counter.get();
    }

    @Override
    public Object createEntry(Object key) {
        if (!(key instanceof String)) {
            throw new IllegalArgumentException("Key must be String!");
        }

        try {
            Thread.sleep(5 * 1000L);
        } catch (InterruptedException e) { }

        InputStream is = null;

        try {
            is = getClass().getClassLoader().getResourceAsStream(key.toString());

            if (is == null) {
                return null;
            }

            try (InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
                 BufferedReader reader = new BufferedReader(isr)) {
                StringBuilder builder = new StringBuilder();
                int c;
                while ((c = reader.read()) != -1) {
                    builder.append((char) c);
                }

                counter.incrementAndGet();
                return builder.toString();
            } catch (IOException e) {
                throw new CacheException(e);
            }
        } finally {
            if (is != null) {
                try {
                    is.close();
                } catch (IOException e) { }
            }
        }
    }
}

ごくごく簡単な、クラスパス上にキーの名前そのままのファイルがあって、その中に値がテキストとして書かれていることを想定したクラスです。

キーに対応する値がない場合は、nullを返却します。あと、何気に5秒間のスリープが入っています。

というわけで、テスト用にこんなのを用意しました。
src/test/resources/key1

value1

src/test/resources/key2

value2

はい。Ehcacheの設定ファイルは、最後に載せます。

では、テストコードを載せます。まずは、import文などから。
src/test/java/ehcache/cacheloader/blocking/SimpleCacheEntryFactoryTest.java

package ehcache.cacheloader.blocking;

import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.number.OrderingComparison.*;

import java.util.List;

import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element;
import net.sf.ehcache.constructs.blocking.BlockingCache;
import net.sf.ehcache.constructs.blocking.CacheEntryFactory;
import net.sf.ehcache.constructs.blocking.SelfPopulatingCache;

import org.junit.Test;

public class SimpleCacheEntryFactoryTest {
    // テストコード
}

ここに、テストコードを書いていきます。

単純な使い方

通常のEhcacheの使い方と変わるのは、CacheManagerから取得したCacheのインスタンスを、SelfPopulatingCacheに渡してしまうことです。

    @Test
    public void entryFactoryTest() {
        CacheManager cacheManager = CacheManager.newInstance();

        try {
            SimpleCacheEntryFactory entryFactory
                = new SimpleCacheEntryFactory();
            // SelfPopulatingCacheは、Cacheのサブクラスではない
            Ehcache cache
                = new SelfPopulatingCache(cacheManager.getCache("withEntryFactoryCache"),
                                          entryFactory);

            // some code...
        } finally {
            cacheManager.shutdown();
        }
    }

この時、SelfPopulatingCacheのコンストラクタに先ほど実装したCacheEntryFactoryの実装クラスも指定する必要があります。

また、SelfPopulatingCacheクラスはEhcacheインターフェースの実装ではありますが、Cacheクラスのサブクラスではないので、今回はEhcacheインターフェースとして使用します。

あとは、Ehcache#getすれば、裏でCacheEntryFactory#createEntryが呼び出されることになります。

            // Ehcache#getで、CacheEntryFactory#createEntryが呼び出される
            assertThat((String) cache.get("key1").getObjectValue(),
                       is("value1"));
            assertThat(entryFactory.getLoadCount(), is(1));

            // Cacheに乗っていれば、Ehcache#getしてもcreateEntryは呼び出されない
            assertThat((String) cache.get("key1").getObjectValue(),
                       is("value1"));
            assertThat(entryFactory.getLoadCount(), is(1));

            // エントリを削除して再度getすると、createEntryが呼び出される
            cache.remove("key1");
            assertThat((String) cache.get("key1").getObjectValue(),
                       is("value1"));
            assertThat(entryFactory.getLoadCount(), is(2));

            // 存在しないキーへのgetは、もちろんnull
            assertThat(cache.get("not-exists-key").getObjectValue(),
                       nullValue());

ただ、さらっとこんなことを書いていますが

            // 存在しないキーへのgetは、もちろんnull
            assertThat(cache.get("not-exists-key").getObjectValue(),
                       nullValue());

通常、Cacheを使っている場合はCache#getの結果自体がnullになるので、少し動きが違うことになりますね…。

並行アクセス

では、さらに複数スレッドでアクセスしてみましょう。こんなスレッドクラスを用意します。

    private static class WorkerThread extends Thread {
        private Ehcache cache;
        private long elapsedTime;
        private String value;

        WorkerThread(String name, Ehcache cache) {
            super(name);
            this.cache = cache;
        }

        @Override
        public void run() {
            long startTime = System.currentTimeMillis();
            value = (String) cache.get("key2").getObjectValue();
            elapsedTime = System.currentTimeMillis() - startTime;
        }

        public String getValue() {
            return value;
        }

        public long getElapsedTime() {
            return elapsedTime;
        }
    }

キー「key2」に対して値の取得を行うのと、その時の実行時間を計測しています。

同じキーに対して、並行でgetするコード。

    @Test
    public void concurrentEntryFactoryTest() {
        CacheManager cacheManager = CacheManager.newInstance();

        try {
            SimpleCacheEntryFactory entryFactory
                = new SimpleCacheEntryFactory();
            Ehcache cache
                = new SelfPopulatingCache(cacheManager.getCache("withEntryFactoryCache"),
                                          entryFactory);
            // 並行アクセス
            WorkerThread wt1 = new WorkerThread("thread-1", cache);
            WorkerThread wt2 = new WorkerThread("thread-2", cache);

            long startTime = System.currentTimeMillis();

            wt1.start();
            // 3秒後、スレッド2をスタート
            sleep(3);
            wt2.start();

            // 待ち合わせ
            try {
                wt1.join();
                wt2.join();
            } catch (InterruptedException e) { }

            // 時間計測
            long elapsedTime = System.currentTimeMillis() - startTime;

            // スレッド1は、5秒少々時間がかかっている
            assertThat(wt1.getValue(), is("value2"));
            assertThat(wt1.getElapsedTime(), lessThan(6 * 1000L));

            // スレッド2は、3秒以内で終わっている
            assertThat(wt2.getValue(), is("value2"));
            assertThat(wt2.getElapsedTime(), lessThan(3 * 1000L));

            // 全体としての経過時間は、1番目のスレッドとほぼ同じ
            assertThat(elapsedTime, lessThan(6 * 1000L));
            // CacneEntryFactory#createEntryの呼び出し回数は1度だけ
            assertThat(entryFactory.getLoadCount(), is(1));
        } finally {
            cacheManager.shutdown();
        }
    }

    private void sleep(long sec) {
        try {
            Thread.sleep(sec * 1000L);
        } catch (InterruptedException e) {
        }
    }

結果から、最初にスタートしたスレッドは処理に5秒間かかる(自前のCacheEntryFactoryが5秒スリープするから)わけですが、さらに3秒後にスタートしたスレッドは、その差分の時間で処理が終了します。

つまり、SelfPopulatingCacheを使用した場合は、

  • 同じキーに対して並行にアクセスし、かつキャッシュに値が存在しなかった場合はスレッドがブロックする
  • 最初にgetしたスレッドが値を取得したところで、他に待っていたスレッドは、その値を利用する

みたいな動きになるわけですね。

まあ、この動作は良い時もあれば、悪い時(キャッシュにない場合は、すぐに諦めて処理を進めたい時など)もあると思いますので、利用はケースバイケース…というか、キャッシュを利用すること自体そういうものかなとも思いますが。

知っておくと便利かもしれません。

今回利用した設定ファイルは、こちらになります。
src/main/resources/ehcache.xml

<?xml version="1.0" encoding="UTF-8"?>
<ehcache>
  <diskStore path="java.io.tmpdir"/>
  <defaultCache
      maxEntriesLocalHeap="10000"
      eternal="false"
      timeToIdleSeconds="120"
      timeToLiveSeconds="120"
      maxEntriesLocalDisk="10000000"
      diskExpiryThreadIntervalSeconds="120"
      memoryStoreEvictionPolicy="LRU"
      />

  <cache
      name="withEntryFactoryCache"
      maxEntriesLocalHeap="10000"
      eternal="false"
      timeToIdleSeconds="120"
      timeToLiveSeconds="120"
      maxEntriesLocalDisk="10000000"
      diskExpiryThreadIntervalSeconds="120"
      memoryStoreEvictionPolicy="LRU"
      />

  <cache
      name="withLoaderCache"
      maxEntriesLocalHeap="10000"
      eternal="false"
      timeToIdleSeconds="120"
      timeToLiveSeconds="120"
      maxEntriesLocalDisk="10000000"
      diskExpiryThreadIntervalSeconds="120"
      memoryStoreEvictionPolicy="LRU">
    <cacheLoaderFactory class="ehcache.cacheloader.SimpleCacheLoaderFactory"
                        properties="propertyName=propertyValue"/>
  </cache>
</ehcache>

ひとつ、使っていないキャッシュがありますが、それはオマケで。

オマケ)CacheLoader

最初に探したのは、SelfPopulatingCacheではなく実はこちらでした。

Cache Loaders
http://ehcache.org/documentation/2.4/user-guide/cache-loaders

こちらは、SelfPopulatingCacheに似ていますが、ちょっと違います。

CacheLoaderを使用するには、ファクトリクラスとそこから生成されるCacheLoaderFactoryの実装クラスが必要です。

まずはCacheLoaderFactoryクラスの実装。
src/main/java/ehcache/cacheloader/SimpleCacheLoaderFactory.java

package ehcache.cacheloader;

import java.util.Properties;

import net.sf.ehcache.Ehcache;
import net.sf.ehcache.loader.CacheLoader;
import net.sf.ehcache.loader.CacheLoaderFactory;

public class SimpleCacheLoaderFactory extends CacheLoaderFactory {
    @Override
    public CacheLoader createCacheLoader(Ehcache cache, Properties properties) {
        return new SimpleCacheLoader(properties);
    }
}

そこから作成される、CacheLoaderの実装。やっていることは、先ほどのSelfPopulatingCacheで使用した、SimpleCacheEntryFactoryクラスとさほど変わりません。ただ、ステータス管理があったりと実装量は多いですが…。
src/main/java/ehcache/cacheloader/SimpleCacheLoader.java

package ehcache.cacheloader;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import net.sf.ehcache.CacheException;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Status;
import net.sf.ehcache.loader.CacheLoader;

public class SimpleCacheLoader implements CacheLoader {
    private Status currentStatus = Status.STATUS_UNINITIALISED;
    private String name = getClass().getSimpleName();
    private AtomicInteger counter = new AtomicInteger();

    public SimpleCacheLoader(Properties properties) {
    }

    public int getLoadCount() {
        return counter.get();
    }

    @Override
    public Status getStatus() {
        return currentStatus;
    }

    @Override
    public String getName() {
        return name;
    }

    @Override
    public void init() {
        currentStatus = Status.STATUS_ALIVE;
    }

    @Override
    public Object load(Object key) throws CacheException {
        return load(key, null);
    }

    @Override
    public Object load(Object key, Object argument) throws CacheException {
        if (!(key instanceof String)) {
            throw new IllegalArgumentException("Key must be String!");
        }

        InputStream is = null;

        try {
            is = getClass().getClassLoader().getResourceAsStream(key.toString());

            if (is == null) {
                return null;
            }

            try (InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
                 BufferedReader reader = new BufferedReader(isr)) {
                StringBuilder builder = new StringBuilder();
                int c;
                while ((c = reader.read()) != -1) {
                    builder.append((char) c);
                }

                counter.incrementAndGet();
                return builder.toString();
            } catch (IOException e) {
                throw new CacheException(e);
            }
        } finally {
            if (is != null) {
                try {
                    is.close();
                } catch (IOException e) { }
            }
        }
    }

    @Override
    public Map loadAll(Collection keys) throws CacheException {
        return loadAll(keys, null);
    }

    @Override
    public Map loadAll(Collection keys, Object argument) throws CacheException {
        Map<Object, Object> map = new HashMap<>();
        for (Object key : keys) {
            map.put(key, load(key));
        }
        return map;
    }

    @Override
    public CacheLoader clone(Ehcache cache) throws CloneNotSupportedException {
        throw new CloneNotSupportedException("Not Support.");
    }

    @Override
    public void dispose() throws CacheException {
        currentStatus = Status.STATUS_SHUTDOWN;
    }
}

こちらは、スリープは入っていません。

動作確認用のテストコード。
src/test/java/ehcache/cacheloader/SimpleCacheLoaderTest.java

package ehcache.cacheloader;

import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;

import java.util.List;

import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import net.sf.ehcache.loader.CacheLoader;

import org.junit.Test;

public class SimpleCacheLoaderTest {
    @Test
    public void loaderTest() {
        CacheManager cacheManager = CacheManager.newInstance();

        try {
            Cache cache = cacheManager.getCache("withLoaderCache");

            List<CacheLoader> cacheLoaders
                = cache.getRegisteredCacheLoaders();

            // CacheLoaderが登録されている
            assertThat(cacheLoaders.size(), is(1));
            assertThat(cacheLoaders.get(0), isA(CacheLoader.class));

            SimpleCacheLoader loader = (SimpleCacheLoader) cacheLoaders.get(0);

            // 単にCache#getでは、何も起こらない
            assertThat(cache.get("key"), nullValue());

            //////////////////////////////////
            // 明示的にロードする必要がある
            cache.load("key1");
            // loadは非同期なので、待ち合わせが必要
            sleep(1);

            assertThat((String) cache.get("key1").getObjectValue(),
                       is("value1"));

            // 1度呼ばれている
            assertThat(loader.getLoadCount(), is(1));


            //////////////////////////////////
            // もう1度、Cache#loadを呼び出してみる
            cache.load("key1");
            sleep(1);

            // ロード済みの場合は、呼び出されない
            assertThat(loader.getLoadCount(), is(1));


            //////////////////////////////////
            // 削除
            cache.remove("key1");

            // 再ロード
            cache.load("key1");
            sleep(1);

            // removeしたので、再度ロードが行われる
            assertThat(loader.getLoadCount(), is(2));
        } finally {
            cacheManager.shutdown();
        }
    }

    private void sleep(long sec) {
        try {
            Thread.sleep(sec * 1000L);
        } catch (InterruptedException e) {
        }
    }
}

CacheLoaderFactory自体は、先ほどのehcache.xmlに書いていましたが、設定で登録します。

実際に使うにあたってのSelfPopulatingCacheとの違いは、ここですね。

            // 単にCache#getでは、何も起こらない
            assertThat(cache.get("key"), nullValue());

            //////////////////////////////////
            // 明示的にロードする必要がある
            cache.load("key1");

Cache#getではロードされず、Cache#loadで明示的にロードする必要があるということです。

しかも、非同期です!

            // 明示的にロードする必要がある
            cache.load("key1");
            // loadは非同期なので、待ち合わせが必要
            sleep(1);

            assertThat((String) cache.get("key1").getObjectValue(),
                       is("value1"));

これ、Read Behind的な感じな気もしますが、明示的にloadが要るってちょっと微妙なような…。

そもそも、現在のEhcacheのドキュメント上は、Cache Loadersは載っていませんしね。現在は、あまり使われることを想定していないのかもしれません。