CLOVER🍀

That was when it all began.

Elasticsearch 5.0.0のREST Clientを試す

Elasticsearch 5.0.0から、REST API用のJava向けClientが追加されたということで。

https://www.elastic.co/guide/en/elasticsearch/reference/current/release-notes-5.0.0.html

これまで、ElasticsearchのオフィシャルなJava向けのClientといえば、NodeClient API(データを持たないClient API)であり、REST APIを使おうと思うとサードパーティ製(例えばJest)を使うことになっていたと思うのですが、5.0.0になってようやくオフィシャルにも追加されたと。

というわけで、まずは試しということで使ってみました。

まあ、感想としては、まだJestを覆すにはちょっと厳しいのでは…という気はしますが。

REST Client自体のドキュメントは、こちらです。

Java REST Client [5.0] | Elastic

準備

ElasticsearchのREST Clientを使うためのMaven依存関係。

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>rest</artifactId>
            <version>5.0.0</version>
        </dependency>

あと、テスト用としてJUnitとAssertJを

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.5.2</version>
            <scope>test</scope>
        </dependency>

JSON変換用として、Jacksonを追加。

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.8.4</version>
        </dependency>

なんでJackson要るんよって気がしないでもないですが…。それは後述。

REST Clientの依存関係

REST Clientが、どんなライブラリに依存しているかは、こちらのドキュメントに記述があります。

Dependencies | Java REST Client [5.0] | Elastic

要は、Apache HttpComponents(HttpClient)です。それに、Commons CodecとCommons Logging。

Apache HttpComponentsは、裏で使われてるのかなぁと思いきや、けっこうがっつりと表に出てきます。

使ってみる

それでは、使ってみましょう。テストコードの雛形は、こちら。
src/test/java/org/littlewings/elasticsearch/client/ElasticsearchRestClientTest.java

package org.littlewings.elasticsearch.client;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.HeapBufferedAsyncResponseConsumer;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class ElasticsearchRestClientTest {
    // ここに、テストを書く!!
}

リクエストを投げる時に使うAPIについては、こちら。
Performing requests | Java REST Client [5.0] | Elastic

レスポンスの読み出しに使うAPIは、こちら。
Reading responses | Java REST Client [5.0] | Elastic

REST Clientを使ったサンプルは、こちら。
Example requests | Java REST Client [5.0] | Elastic

ドキュメントを読むとわかりますが、リクエストを投げるためのAPIはけっこうLow Levelです。最後のサンプルを見ると、JSONを文字列で渡してHttpEntityにしていますしね。レスポンスについては、Apache HttpComponentsそのままです。

というわけで、「とりあえず公式にもHTTPクライアントできたよー」くらいな感じな気が…。

まあ、今回は気にせずいってみます。

JSON変換のところは、Jacksonで行うことにします。これが、依存関係にJacksonを入れていた理由です。

    ObjectMapper mapper = new ObjectMapper();

最初は、ドキュメントの登録。

    @Test
    public void putDocumentSimple() throws IOException {
        try (RestClient client = RestClient.builder(new HttpHost("localhost", 9200)).build()) {
            Map<String, String> article = new HashMap<>();
            article.put("title", "My First Post!!");
            article.put("content", "Hello Elasticsearch RestClient!!");

            HttpEntity entity = new NStringEntity(mapper.writeValueAsString(article), ContentType.APPLICATION_JSON);

            Response response = client.performRequest("PUT", "/blog/article/1", Collections.emptyMap(), entity);

            assertThat(response.getStatusLine().getStatusCode())
                    .isEqualTo(HttpStatus.SC_CREATED);

            Map<String, String> responseEntity = mapper.readValue(response.getEntity().getContent(), Map.class);
            assertThat(responseEntity.get("_index"))
                    .isEqualTo("blog");
            assertThat(responseEntity.get("_type"))
                    .isEqualTo("article");
        }
    }

RestClient自体は、RestClientBuilderにより構築するのですが、RestClientBuilderは直接インスタンス化するのではなく、RestClient#builderメソッドから取得します。この時、接続先としてHttpHostを与えますが、この時点でApache HttpComponentsのものだったりします。

        try (RestClient client = RestClient.builder(new HttpHost("localhost", 9200)).build()) {

Bodyに対応する部分はApache HttpComponentsのHttpEntityで表現するので、今回はMapとJacksonでJSON文字列を組み立て、あとはドキュメントに従いNStringEntityにしました。

            Map<String, String> article = new HashMap<>();
            article.put("title", "My First Post!!");
            article.put("content", "Hello Elasticsearch RestClient!!");

            HttpEntity entity = new NStringEntity(mapper.writeValueAsString(article), ContentType.APPLICATION_JSON);

そして、このHttpEntityを使ってRestClient#performRequestを呼び出します。performRequestにはいくつかオーバーロードされた種類のものがありますが、この場合は第1引数にHTTPメソッド、第2引数にエンドポイントのパス、第3引数にQueryのパラメーター、最後にHttpEntityです。レスポンスはApache HttpComponentsのResponseとなります。

            Response response = client.performRequest("PUT", "/blog/article/1", Collections.emptyMap(), entity);

あとは、Body部をJSONパースして読んだりしましょう。

            Map<String, String> responseEntity = mapper.readValue(response.getEntity().getContent(), Map.class);
            assertThat(responseEntity.get("_index"))
                    .isEqualTo("blog");
            assertThat(responseEntity.get("_type"))
                    .isEqualTo("article");

最初はこんなところ。

RestClient#performRequestには、Queryを指定したりしなかったり、HttpEntityを指定したりしなかったりするものがありますが、この他にHttpAsyncResponseConsumerを引数に取るものがあります。こちらも使用してみましょう。

HttpAsyncResponseConsumerを使うと、コールバックを受けることができるようになります。コールバックを受けるタイミングは、レスポンスが返ってきた時、コンテンツを受信した時などがあります。

で、実際には他のAPIでもHttpAsyncResponseConsumerが使われていて、内部的にはHeapBufferedAsyncResponseConsumerが使用されています。以下のコードが、近い例になるでしょう。

    @Test
    public void putDocumentWithConsumer() throws IOException {
        try (RestClient client = RestClient.builder(new HttpHost("localhost", 9200)).build()) {
            Map<String, String> article = new HashMap<>();
            article.put("title", "This is My Blog");
            article.put("content", "Elasticsearch is full-text search engine.");

            HttpEntity entity = new NStringEntity(mapper.writeValueAsString(article), ContentType.APPLICATION_JSON);

            Response response =
                    client.performRequest("PUT",
                            "/blog/article/2",
                            Collections.emptyMap(),
                            entity,
                            new HeapBufferedAsyncResponseConsumer());

            assertThat(response.getStatusLine().getStatusCode())
                    .isEqualTo(HttpStatus.SC_CREATED);

            Map<String, String> responseEntity = mapper.readValue(response.getEntity().getContent(), Map.class);
            assertThat(responseEntity.get("_index"))
                    .isEqualTo("blog");
            assertThat(responseEntity.get("_type"))
                    .isEqualTo("article");
        }
    }

リクエストの投げ方としては、別の系統としてRestClient#performRequestAsyncもあります。

こちらを使用すると、RestClient#performRequestAsyncの戻り値はvoidとなり、ResponseListenerの実装クラスがレスポンスを受け取るようになります。

例は、以下。

    @Test
    public void putDocumentAsync() throws IOException, InterruptedException {
        try (RestClient client = RestClient.builder(new HttpHost("localhost", 9200)).build()) {
            Map<String, String> article = new HashMap<>();
            article.put("title", "put Document Async");
            article.put("content", "use RestClient Async API.");

            HttpEntity entity = new NStringEntity(mapper.writeValueAsString(article), ContentType.APPLICATION_JSON);

            CountDownLatch latch = new CountDownLatch(1);

            client.performRequestAsync("PUT",
                    "/blog/article/3",
                    Collections.emptyMap(),
                    entity,
                    new ResponseListener() {
                        @Override
                        public void onSuccess(Response response) {
                            latch.countDown();

                            assertThat(response.getStatusLine().getStatusCode())
                                     .isEqualTo(HttpStatus.SC_CREATED);

                            try {
                                Map<String, String> responseEntity = mapper.readValue(response.getEntity().getContent(), Map.class);
                                assertThat(responseEntity.get("_index"))
                                        .isEqualTo("blog");
                                assertThat(responseEntity.get("_type"))
                                        .isEqualTo("article");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }

                        @Override
                        public void onFailure(Exception exception) {
                            latch.countDown();

                            exception.printStackTrace();
                        }
                    });

            latch.await();
        }
    }

ResponseListenerを使用する場合は、onSuccessおよびonFailureメソッドをオーバーライドし、それぞれ呼び出し成功時/失敗時の処理を実装します。

                    new ResponseListener() {
                        @Override
                        public void onSuccess(Response response) {
                            latch.countDown();

                            assertThat(response.getStatusLine().getStatusCode())
                                     .isEqualTo(HttpStatus.SC_CREATED);

                            try {
                                Map<String, String> responseEntity = mapper.readValue(response.getEntity().getContent(), Map.class);
                                assertThat(responseEntity.get("_index"))
                                        .isEqualTo("blog");
                                assertThat(responseEntity.get("_type"))
                                        .isEqualTo("article");
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }

                        @Override
                        public void onFailure(Exception exception) {
                            latch.countDown();

                            exception.printStackTrace();
                        }
                    });

今回は、同期処理と同じ感じで実装。

続いて、idを指定してのDocument取得。

    @Test
    public void findDocument() throws IOException {
        try (RestClient client = RestClient.builder(new HttpHost("localhost", 9200)).build()) {
            Response response = client.performRequest("GET", "/blog/article/1");

            assertThat(response.getStatusLine().getStatusCode())
                    .isEqualTo(HttpStatus.SC_OK);

            Map<String, Object> responseEntity = mapper.readValue(response.getEntity().getContent(), Map.class);

            Map<String, String> source = (Map<String, String>) responseEntity.get("_source");
            assertThat(source.get("title"))
                    .isEqualTo("My First Post!!");
            assertThat(source.get("content"))
                    .isEqualTo("Hello Elasticsearch RestClient!!");
        }
    }

こちらは、単にエンドポイントにidを指定しているだけなので、Body(HttpEntity)は不要です。

検索。検索では、QueryStringで「q」を指定する方法で書いたので、今回の例では唯一QueryStringを使用した例になっています。

    @Test
    public void searchDocument() throws IOException {
        try (RestClient client = RestClient.builder(new HttpHost("localhost", 9200)).build()) {
            Map<String, String> params = new HashMap<>();
            params.put("q", "elasticsearch");

            Response response = client.performRequest("GET", "/blog/article/_search", params);

            assertThat(response.getStatusLine().getStatusCode())
                    .isEqualTo(HttpStatus.SC_OK);

            Map<String, Object> responseEntity = mapper.readValue(response.getEntity().getContent(), Map.class);

            Map<String, Object> hits = (Map<String, Object>) responseEntity.get("hits");

            int total = Integer.parseInt(hits.get("total").toString());
            assertThat(total)
                    .isGreaterThanOrEqualTo(2);

        }
    }

最後は、インデックスを削除。DELETEメソッドで。

        try (RestClient client = RestClient.builder(new HttpHost("localhost", 9200)).build()) {
            Response response = client.performRequest("DELETE", "/blog");
            assertThat(response.getStatusLine().getStatusCode())
                    .isEqualTo(HttpStatus.SC_OK);
        }

まとめ

という感じで、ElasticsearchのREST Clientを使ってみました。

こんな調子で、RestClient#performRequest/performRequestAsyncくらいしか処理のエントリポイントがないので、割とLow LevelなAPIになります。

これだと、まだJestの方が使いやすそうな気がしますが、「オフィシャルが提供を開始した」ということに意味があると思うので、今後のバージョンアップに期待しておきましょう。