CLOVER🍀

That was when it all began.

Hibernate Search+Infinispanでクラスタを構成した時の、インデックスの更新について

以前、少し動かしては「なんかダメっぽい」程度の印象で止めていた、このテーマをもう少し見てみることにしました。

内容としては、

Hibernate Searchで使うLuceneのインデックスの保存先をInfinispanにして、なおかつクラスタを構成した時のインデックスの更新時の挙動について

です。

以前、この内容でクラスタを構成した時、EJBやSpringのComponentの初期化時にMassIndexerを仕込んでいると、後から起動した方がロックを取れずに負けるので、「そうなのか〜」程度に捉えていました。が、もう少し確認してみようかなと。

結論を先に

結果から言ってしまうと、Hibernate Search(バックエンドをInfinispan)でクラスタを構成した時、インデックスを更新できるのはひとつのNodeとなり、そこまではいいのですがそれ以後も同じNodeでインデックスを更新しなくてはいけないようです。

インデックスの分散保持というよりは、マスター/スレーブな感じですね。

Spring Bootで軽くサンプルを書いてみました。設定とかは、後で載せます。

Cacheのモードは、Distributed Cacheです(こちらも、後で載せます)。

Controller。操作できるのは、Entityの登録&インデックスの更新、検索、そしてインデックスの再構築です。
src/main/java/org/littlewings/springboot/hibernate/ContentsController.java

package org.littlewings.springboot.hibernate;

import java.util.List;
import java.util.Map;
import javax.persistence.EntityManager;

import org.apache.lucene.search.Query;
import org.hibernate.search.MassIndexer;
import org.hibernate.search.jpa.FullTextEntityManager;
import org.hibernate.search.jpa.Search;
import org.hibernate.search.query.dsl.QueryBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Transactional
@RequestMapping("contents")
public class ContentsController {
    @Autowired
    private EntityManager entityManager;

    @RequestMapping(value = "registry", method = RequestMethod.POST)
    public void registry(@RequestBody List<Contents> contentsList) {
        contentsList.forEach(entityManager::persist);
    }

    @RequestMapping(value = "search", method = RequestMethod.POST)
    @SuppressWarnings("unchecked")
    public List<Contents> search(@RequestBody Map<String, String> request) {
        String word = request.get("word");

        FullTextEntityManager fullTextEntityManager = createFullTextEntityManager();

        QueryBuilder qb =
                fullTextEntityManager
                        .getSearchFactory()
                        .buildQueryBuilder()
                        .forEntity(Contents.class)
                        .get();

        Query query;
        if (word == null || word.isEmpty()) {
            query = qb.all().createQuery();
        } else {
            query = qb.keyword().onField("value").matching(word).createQuery();
        }

        return fullTextEntityManager
                .createFullTextQuery(query, Contents.class)
                .getResultList();
    }

    @RequestMapping("reindex")
    public String reindex() throws InterruptedException {
        FullTextEntityManager fullTextEntityManager = createFullTextEntityManager();
        MassIndexer indexer = fullTextEntityManager.createIndexer();
        indexer.startAndWait();
        return "Finish!!";
    }

    private FullTextEntityManager createFullTextEntityManager() {
        return Search.getFullTextEntityManager(entityManager);
    }
}

Entityは、このような定義のものを使います。
src/main/java/org/littlewings/springboot/hibernate/Contents.java

package org.littlewings.springboot.hibernate;

import java.io.Serializable;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;

import org.hibernate.search.annotations.Field;
import org.hibernate.search.annotations.Indexed;

@Entity
@Table(name = "contents")
@Indexed
public class Contents implements Serializable {
    private static final long serialVersionUID = 1L;

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    private Long id;

    @Column
    @Field
    private String value;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

テーブル定義は、このように。

mysql> DESC contents;
+-------+--------------+------+-----+---------+----------------+
| Field | Type         | Null | Key | Default | Extra          |
+-------+--------------+------+-----+---------+----------------+
| id    | int(11)      | NO   | PRI | NULL    | auto_increment |
| value | varchar(255) | YES  |     | NULL    |                |
+-------+--------------+------+-----+---------+----------------+
2 rows in set (0.08 sec)

で、こちらをパッケージングして、2つNodeを起動してみます。
※JARファイル名は適当

## Node1
$ java -jar target/app-0.0.1-SNAPSHOT.jar

## Node2
$ java -jar target/app-0.0.1-SNAPSHOT.jar --server.port=9080

起動途中で、クラスタが構成されます。

2015-08-08 01:16:22.999  INFO 67540 --- [xxxxx-25497] o.i.r.t.jgroups.JGroupsTransport         : ISPN000094: Received new cluster view for channel cluster: [xxxxx-25497|1] (2) [xxxxx-25497, xxxxx-925]

リクエストは、このようなJSONを投げてみます。
request1.json

[
  {"value": "はじめてのSpring Boot"},
  {"value": "高速スケーラブル検索エンジン ElasticSearch Server"},
  {"value": "わかりやすいJava EE ウェブシステム入門"}
]

まずは、Node 1にデータを登録してみます。

$ curl -XPOST -H 'Content-Type: application/json' http://localhost:8080/contents/registry --data-binary @request1.json

Node 1、Node 2でそれぞれ検索。

## Node 1
$ curl -XPOST -H 'Content-Type: application/json' http://localhost:8080/contents/search -d '{"word": "spring"}'
[{"id":1,"value":"はじめてのSpring Boot"}]

## Node 2
$ curl -XPOST -H 'Conten:9080/contents/search -d '{"word": "spring"}'
[{"id":1,"value":"はじめてのSpring Boot"}]

両方共OKですね。

それでは、Node 2側に今度は以下のようなデータを登録してみます。
request2.json

[
  {"value": "[改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン"},
  {"value": "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava (Programmer’s SELECTION)"},
  {"value": "Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ"}
]

実行。

$ curl -XPOST -H 'Content-Type: application/json' http://localhost:9080/contents/registry --data-binary @request2.json

すると、裏でコケております…。

2015-08-08 01:20:13.347 ERROR 67607 --- [ernate.Contents] o.h.s.exception.impl.LogErrorHandler     : HSEARCH000058: Exception occurred org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: org.infinispan.lucene.locking.BaseLuceneLock@34e01b86
Primary Failure:
	Entity org.littlewings.springboot.hibernate.Contents  Id 4  Work Type  org.hibernate.search.backend.AddLuceneWork
Subsequent failures:
	Entity org.littlewings.springboot.hibernate.Contents  Id 5  Work Type  org.hibernate.search.backend.AddLuceneWork
	Entity org.littlewings.springboot.hibernate.Contents  Id 6  Work Type  org.hibernate.search.backend.AddLuceneWork


org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: org.infinispan.lucene.locking.BaseLuceneLock@34e01b86
	at org.apache.lucene.store.Lock.obtain(Lock.java:89)
	at org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:755)
	at org.hibernate.search.backend.impl.lucene.IndexWriterHolder.createNewIndexWriter(IndexWriterHolder.java:131)
	at org.hibernate.search.backend.impl.lucene.IndexWriterHolder.getIndexWriter(IndexWriterHolder.java:97)
	at org.hibernate.search.backend.impl.lucene.AbstractWorkspaceImpl.getIndexWriter(AbstractWorkspaceImpl.java:112)
	at org.hibernate.search.backend.impl.lucene.LuceneBackendQueueTask.applyUpdates(LuceneBackendQueueTask.java:81)
	at org.hibernate.search.backend.impl.lucene.LuceneBackendQueueTask.run(LuceneBackendQueueTask.java:47)
	at org.hibernate.search.backend.impl.lucene.SyncWorkProcessor$Consumer.applyChangesets(SyncWorkProcessor.java:145)
	at org.hibernate.search.backend.impl.lucene.SyncWorkProcessor$Consumer.run(SyncWorkProcessor.java:135)
	at java.lang.Thread.run(Thread.java:745)

2015-08-08 01:20:13.347 ERROR 67607 --- [ernate.Contents] o.h.s.b.i.lucene.LuceneBackendQueueTask  : HSEARCH000072: Couldn't open the IndexWriter because of previous error: operation skipped, index ouf of sync!

ロックが取れなくて、Node 2側でIndexWriterが開けないようです。

で、データはどうなったかというと、きちんと登録されているようです。

mysql> SELECT * FROM contents;
+----+----------------------------------------------------------------------------------------------------+
| id | value                                                                                              |
+----+----------------------------------------------------------------------------------------------------+
|  1 | はじめてのSpring Boot                                                                              |
|  2 | 高速スケーラブル検索エンジン ElasticSearch Server                                                  |
|  3 | わかりやすいJava EE ウェブシステム入門                                                             |
|  4 | [改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン                                        |
|  5 | Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava (Programmer’s SELECTION)               |
|  6 | Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ                                     |
+----+----------------------------------------------------------------------------------------------------+
6 rows in set (0.00 sec)

でも、全文検索では結果に現れません。

$ curl -XPOST -H 'Content-Type: application/json' http://localhost:8080/contents/search -d '{"word": "spring"}'
[{"id":1,"value":"はじめてのSpring Boot"}]
$ curl -XPOST -H 'Content-Type: application/json' http://localhost:9080/contents/search -d '{"word": "spring"}'
[{"id":1,"value":"はじめてのSpring Boot"}]

インデックスの更新のみ、失敗しているようですね。

ここで、Node 1にインデックスの再構築を指示すると

$ curl -H 'Content-Type: application/json' http://localhost:8080/contents/reindex
Finish!!

結果に現れるようになります。

## Node 1
$ curl -XPOST -H 'Content-Type: application/json' http://localhost:8080/contents/search -d '{"word": "spring"}'
[{"id":1,"value":"はじめてのSpring Boot"},{"id":6,"value":"Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ"},{"id":9,"value":"Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ"}]

## Node 2
$ curl -XPOST -H 'Content-Type: application/json' http://localhost:9080/contents/search -d '{"word": "spring"}'
[{"id":1,"value":"はじめてのSpring Boot"},{"id":6,"value":"Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ"},{"id":9,"value":"Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ"}]

データ自体は、テーブルに入っていますからね…。

Node 2は、インデックスを更新できないだけでなく、インデックスの再構築(MassIndexer)もできません。

今回は、Node 1で更新 → 検索 → Node 2で更新 → 失敗 → Node 1でMassIndexer と実行しましたが、Node 1で更新 → 検索 → Node 2でMassIndexer とやっても、やっぱりロックが取れずに失敗します(この場合は、Node 2のMassIndexerが何回か失敗し続けてくれますが…)。

今まで、検索してみてどう?くらいしかしていなかったので、ちゃんと確認してよかったなーという気になりました。

Luceneのインデックス更新を伴う場合は、どれかひとつのNodeに固定してリクエストを投げないと、うまくいかないということですね。もちろん、その時更新可能なNodeがダウンしたりすると、ロックが外れるので他のNodeも更新できるようになります。

もちろん、その後に更新したNodeが、ロックを持ち続けるのですが。

この単一Nodeしか更新できないという制限、ちゃんと実感してこなかったので、よいきっかけになりました。

今回はHibernate Searchの裏にInfinispanがいるという感じですが、InfinispanのクエリモジュールとしてHibernate Searchを使うとどうなるのかは、ちょっと気になりますね。

Querying Infinispan
http://infinispan.org/docs/7.2.x/user_guide/user_guide.html#sid-68355061

Automatic configuration
http://infinispan.org/docs/7.2.x/user_guide/user_guide.html#_automatic_configuration

この中の設定項目で特に気になるのは、「hibernate.search.default.indexmanager」ですね。また今度、確認してみましょう。

追記
確認してみました。結果は、こちら。

Infinispan+Hibernate Searchでクラスタを構成した時の、インデックスの更新について
http://d.hatena.ne.jp/Kazuhira/20150808/1439036374

オマケ

先ほど動作させたプログラムですが、紹介しなかったpom.xmlや他のコードを載せておきます。先ほど紹介したコードは、ここでは端折ります。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.littlewings</groupId>
    <artifactId>hibernate-search-integration-infinispan-clustered</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spring.boot.version>1.2.5.RELEASE</spring.boot.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring.boot.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-search-orm</artifactId>
            <version>5.3.0.Final</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-search-infinispan</artifactId>
            <version>5.3.0.Final</version>
        </dependency>
        <dependency>
            <groupId>org.apache.lucene</groupId>
            <artifactId>lucene-analyzers-kuromoji</artifactId>
            <version>4.10.4</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.36</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
</project>

ムダにKuromoji入り。

プログラムのエントリポイント。
src/main/java/org/littlewings/springboot/hibernate/App.java

package org.littlewings.springboot.hibernate;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

Spring Bootの設定ファイル。
src/main/resources/application.yml

spring:
  datasource:
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/practice?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false
    username: kazuhira
    password: password
    testOnBorrow: true
    validationQuery: SELECT 1
  jpa:
    hibernate.ddl-auto: none
    properties:
      hibernate:
        show_sql: true
        format_sql: true
        search:
          default:
            directory_provider: infinispan
          infinispan.configuration_resourcename: infinispan.xml
          analyzer: org.apache.lucene.analysis.ja.JapaneseAnalyzer
          lucene_version: LUCENE_4_10_4

Infinispanの設定ファイル。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:7.2 http://www.infinispan.org/schemas/infinispan-config-7.2.xsd"
        xmlns="urn:infinispan:config:7.2">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container name="indexingCacheManager" shutdown-hook="DONT_REGISTER">
      <transport cluster="cluster" stack="udp" />

      <distributed-cache name="LuceneIndexesData" mode="SYNC" remote-timeout="25000">
        <transaction mode="NONE"/>
        <state-transfer enabled="true" timeout="480000" await-initial-transfer="true" />
        <indexing index="NONE" />
        <locking striping="false" acquire-timeout="10000" concurrency-level="500" write-skew="false" />
        <eviction max-entries="-1" strategy="NONE"/>
        <expiration max-idle="-1"/>
      </distributed-cache>

      <replicated-cache name="LuceneIndexesMetadata" mode="SYNC" remote-timeout="25000">
        <transaction mode="NONE"/>
        <state-transfer enabled="true" timeout="480000" await-initial-transfer="true" />
        <indexing index="NONE" />
        <locking striping="false" acquire-timeout="10000" concurrency-level="500" write-skew="false" />
        <eviction max-entries="-1" strategy="NONE"/>
        <expiration max-idle="-1"/>
      </replicated-cache>

      <replicated-cache name="LuceneIndexesLocking" mode="SYNC" remote-timeout="25000">
        <transaction mode="NONE"/>
        <state-transfer enabled="true" timeout="480000" await-initial-transfer="true" />
        <indexing index="NONE" />
        <locking striping="false" acquire-timeout="10000" concurrency-level="500" write-skew="false" />
        <eviction max-entries="-1" strategy="NONE"/>
        <expiration max-idle="-1"/>
      </replicated-cache>
    </cache-container>
</infinispan>

JGroupsの設定ファイル。
src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.6.xsd">
    <UDP
            mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}"
            mcast_port="${jgroups.udp.mcast_port:46655}"
            ucast_recv_buf_size="150k"
            ucast_send_buf_size="130k"
            mcast_recv_buf_size="150k"
            mcast_send_buf_size="130k"
            ip_ttl="${jgroups.ip_ttl:2}"
            thread_naming_pattern="pl"
            enable_diagnostics="false"

            thread_pool.min_threads="${jgroups.thread_pool.min_threads:2}"
            thread_pool.max_threads="${jgroups.thread_pool.max_threads:30}"
            thread_pool.keep_alive_time="60000"
            thread_pool.queue_enabled="false"

            internal_thread_pool.min_threads="${jgroups.internal_thread_pool.min_threads:5}"
            internal_thread_pool.max_threads="${jgroups.internal_thread_pool.max_threads:20}"
            internal_thread_pool.keep_alive_time="60000"
            internal_thread_pool.queue_enabled="true"
            internal_thread_pool.queue_max_size="500"

            oob_thread_pool.min_threads="${jgroups.oob_thread_pool.min_threads:20}"
            oob_thread_pool.max_threads="${jgroups.oob_thread_pool.max_threads:200}"
            oob_thread_pool.keep_alive_time="60000"
            oob_thread_pool.queue_enabled="false"
            />

    <PING/>
    <MERGE3 min_interval="10000"
            max_interval="30000"
            />
    <FD_SOCK/>
    <FD_ALL timeout="60000"
            interval="15000"
            timeout_check_interval="5000"
            />
    <VERIFY_SUSPECT timeout="5000"
            />

    <pbcast.NAKACK2 xmit_interval="1000"
                    xmit_table_num_rows="50"
                    xmit_table_msgs_per_row="1024"
                    xmit_table_max_compaction_time="30000"
                    max_msg_batch_size="100"
                    resend_last_seqno="true"
            />

    <UNICAST3 xmit_interval="500"
              xmit_table_num_rows="50"
              xmit_table_msgs_per_row="1024"
              xmit_table_max_compaction_time="30000"
              max_msg_batch_size="100"
              conn_expiry_timeout="0"
            />
    <pbcast.STABLE stability_delay="500"
                   desired_avg_gossip="5000"
                   max_bytes="1M"
            />
    <pbcast.GMS print_local_addr="true"
                join_timeout="15000"
            />

    <!-- <tom.TOA/> -->
    <!-- the TOA is only needed for total order transactions-->

    <UFC max_credits="2m"
         min_threshold="0.40"
            />
    <MFC max_credits="2m"
         min_threshold="0.40"
            />
    <FRAG2/>
</config>

以上でしたー。