CLOVER🍀

That was when it all began.

Elasticsearchでベクトル検索(kNN検索/ANN)を試す

これは、なにをしたくて書いたもの?

少し前にApache Luceneでベクトル検索(kNN検索、ANN)を試してみました。

Apache Luceneでベクトル検索(kNN検索)を試す - CLOVER🍀

Apache LuceneでkNN検索とANN+HNSWを使い分ける(Codecを使ったHNSWのパラメーター設定付き) - CLOVER🍀

今度はElasticsearchで試してみたいと思います。

Elasticsearchのベクトル検索

Elasticsearchのベクトル検索に関するドキュメントはこちら。

k-nearest neighbor (kNN) search | Elasticsearch Guide [8.14] | Elastic

ElasticsearchでkNN検索ができるようになったのは、8.0からのようです。

ベクトル検索を行うためには、テキスト埋め込みができることが前提です。

機械学習機能に含まれるNLPの機能を使用するには、プラチナ以上のサブスクリプションが必要です。

Subscriptions | Elastic Stack Products & Support | Elastic

ElasticsearchのNLPの機能を使うとドキュメントの登録やクエリーの実行時にテキストから埋め込みを作成したり、ハイブリッド検索と
組み合わせることができるようです。

Semantic search | Elasticsearch Guide [8.14] | Elastic

また、ELSERと呼ばれるElasticによってトレーニングされたモデルを使ったり、OpenAIやHugging Faceのような推論APIとの統合も
できるようです。

今回はこういったものは使わず、テキスト埋め込みに関しては自前でどうにかしたいと思います。

話を戻して、Elasticsearchでのベクトル検索はkNN検索です。

k-nearest neighbor (kNN) search / kNN methods

次の2種類の実行方法をサポートしています。

ANNを使うには、まずドキュメントにdense_vector型のフィールドが定義されている必要があります。

Dense vector field type | Elasticsearch Guide [8.14] | Elastic

ここでベクトル類似度関数(similarity)を、以下のいずれかから指定しておく必要があります。

デフォルトはコサイン類似度です。

検索自体は、通常の検索APIのknnパラメーターで行います。

Search API / Request body / knn

ちょっと変わったポイントや、気になるところをいくつか。

knn側にもsimilarityを指定できますが、こちらはドキュメントのスコアには影響せずboostとして使われるようです。
ドキュメントのスコアにはdense_vectorフィールド定義時のsimilarityが使われるようです。

similarity

(Optional, float) The minimum similarity required for a document to be considered a match. The similarity value calculated relates to the raw similarity used. Not the document score. The matched documents are then scored according to similarity and the provided boost is applied.

パラメーターkでは検索候補の数を指定します。デフォルト値はsizeと同じで、検索結果として取得するドキュメント数です(デフォルト10)。

ところで、Elasticsearchのインデックスは複数のシャードから構成されています。これとどういう関係になるかということですが、
シャードごとに検索候補とする数はnum_candidatesで指定するようです。デフォルト値は、kの1.5倍と10,000の小さい方の値です。

num_candidatesを大きくすることでより多くの候補から検索を行い、最終的に返すドキュメントはk個に選ばられるということになります。
よってより良い検索結果になる可能性がありますが、トレードオフとして検索速度が遅くなります。

k-nearest neighbor (kNN) search / Approximate kNN / Tune approximate kNN for speed or accuracy

kNN検索に対してフィルタリングも可能ですが、kNN検索の後にフィルタリングするのではなく、kNN検索の最中にフィルタリングする
というのがポイントです。どういうことかというと、指定したkやnum_candidatesを満たすように追加で探索を行う可能性があるからです。

ANNの場合、パフォーマンスの低下を避けるためにフィルタリングされたドキュメント数やHNSWグラフの探索ノード数に応じて
検索処理が切り替わります。

knnとqueryの各オプションを併用することで、ハイブリッド検索が可能です。

k-nearest neighbor (kNN) search / Approximate kNN / Combine approximate kNN with other features

セマンティック検索も可能ですが、NLPが必要になります。

k-nearest neighbor (kNN) search / Approximate kNN / Perform semantic search

最後に、インデックス作成時の考慮事項としてHNSWのパラメーターがあります。

k-nearest neighbor (kNN) search / Approximate kNN / Indexing considerations

dense_vector型のフィールドを定義する際の、index_optionsです。これでkNN検索の調整を行うと検索精度の向上ができますが、代償として
インデックスの作成速度が低下します。

Dense vector field type | Elasticsearch Guide [8.14] | Elastic

設定できるパラメーターの意味を書いていきます。

  • type … 使用するkNNのアルゴリズム
  • m … HNSW グラフで各ノードが接続する隣接ノード数。hnswおよびint8_hnswの型のインデックスにのみ適用され、デフォルト値は16
  • ef_construction … それぞれの新規ノードに対して近傍リストを組み立てる際に追跡する候補の数。hnswおよびint8_hnswの型のインデックスにのみ適用され、デフォルト値は100
  • confidence_interval … int8_hnswおよびint8_flatの型のインデックスにのみ適用され、ベクトルを量子化する時に使用する信頼区間を設定する。0.90から1.0までの値が指定可能で、デフォルト値は 1 / (ベクトルの次元数 + 1)

というわけで、今回はテキスト埋め込みは自前でどうにかする+ANNで検索するというお題でやってみたいと思います。

環境

今回の環境はこちら。Elasticsearchは192.168.33.10のIPアドレスで動作しているものとします。

$ curl localhost:9200
{
  "name" : "myserver",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "U-l92bbTTFKXbRhOIrzEGg",
  "version" : {
    "number" : "8.14.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "93a57a1a76f556d8aee6a90d1a95b06187501310",
    "build_date" : "2024-06-10T23:35:17.114581191Z",
    "build_snapshot" : false,
    "lucene_version" : "9.10.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

設定は、シングルノードで認証なしの緩いものにしています。

$ sudo grep -vE '^#|^$' /etc/elasticsearch/elasticsearch.yml
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
discovery.type: "single-node"
xpack.security.enabled: false
xpack.security.enrollment.enabled: true
xpack.security.http.ssl:
  enabled: true
  keystore.path: certs/http.p12
xpack.security.transport.ssl:
  enabled: true
  verification_mode: certificate
  keystore.path: certs/transport.p12
  truststore.path: certs/transport.p12
http.host: 0.0.0.0

また、Elasticsearchへのアクセスおよびテキスト埋め込みにはPythonを使用します。

$ python3 --version
Python 3.10.12


$ pip3 --version
pip 22.0.2 from /usr/lib/python3/dist-packages/pip (python 3.10)

テキスト埋め込みを行うREST APIを作成する

テキスト埋め込みは、Sentence TransformersとFastAPIを使った簡単なAPIで行いたいと思います。

$ pip3 install sentence-transformers fastapi uvicorn[standard]

インストールしたライブラリーの一覧。

$ pip3 list
Package                  Version
------------------------ ----------
annotated-types          0.7.0
anyio                    4.4.0
certifi                  2024.6.2
charset-normalizer       3.3.2
click                    8.1.7
dnspython                2.6.1
email_validator          2.2.0
exceptiongroup           1.2.1
fastapi                  0.111.0
fastapi-cli              0.0.4
filelock                 3.15.4
fsspec                   2024.6.1
h11                      0.14.0
httpcore                 1.0.5
httptools                0.6.1
httpx                    0.27.0
huggingface-hub          0.23.4
idna                     3.7
Jinja2                   3.1.4
joblib                   1.4.2
markdown-it-py           3.0.0
MarkupSafe               2.1.5
mdurl                    0.1.2
mpmath                   1.3.0
networkx                 3.3
numpy                    1.26.4
nvidia-cublas-cu12       12.1.3.1
nvidia-cuda-cupti-cu12   12.1.105
nvidia-cuda-nvrtc-cu12   12.1.105
nvidia-cuda-runtime-cu12 12.1.105
nvidia-cudnn-cu12        8.9.2.26
nvidia-cufft-cu12        11.0.2.54
nvidia-curand-cu12       10.3.2.106
nvidia-cusolver-cu12     11.4.5.107
nvidia-cusparse-cu12     12.1.0.106
nvidia-nccl-cu12         2.20.5
nvidia-nvjitlink-cu12    12.5.82
nvidia-nvtx-cu12         12.1.105
orjson                   3.10.5
packaging                24.1
pillow                   10.4.0
pip                      22.0.2
pydantic                 2.8.0
pydantic_core            2.20.0
Pygments                 2.18.0
python-dotenv            1.0.1
python-multipart         0.0.9
PyYAML                   6.0.1
regex                    2024.5.15
requests                 2.32.3
rich                     13.7.1
safetensors              0.4.3
scikit-learn             1.5.0
scipy                    1.14.0
sentence-transformers    3.0.1
setuptools               59.6.0
shellingham              1.5.4
sniffio                  1.3.1
starlette                0.37.2
sympy                    1.12.1
threadpoolctl            3.5.0
tokenizers               0.19.1
torch                    2.3.1
tqdm                     4.66.4
transformers             4.42.3
triton                   2.3.1
typer                    0.12.3
typing_extensions        4.12.2
ujson                    5.10.0
urllib3                  2.2.2
uvicorn                  0.30.1
uvloop                   0.19.0
watchfiles               0.22.0
websockets               12.0

REST APIになるスクリプトはこちら。

api.py

from fastapi import FastAPI
from pydantic import BaseModel
import os
from sentence_transformers import SentenceTransformer

app = FastAPI()

class EmbeddingRequest(BaseModel):
    model: str
    text: str
    normalize: bool = False

class EmbeddingResponse(BaseModel):
    model: str
    embedding: list[float]
    dimension: int

@app.post("/embeddings/encode")
def encode(request: EmbeddingRequest) -> EmbeddingResponse:
    sentence_transformer_model = SentenceTransformer(
        request.model,
        device=os.getenv("EMBEDDING_API_DEVICE", "cpu")
    )

    embeddings = sentence_transformer_model.encode(sentences=[request.text], normalize_embeddings=request.normalize)
    embedding = embeddings[0]

    # numpy array to float list
    embedding_as_float = embedding.tolist()

    return EmbeddingResponse(
        model=request.model,
        embedding=embedding_as_float,
        dimension=sentence_transformer_model.get_sentence_embedding_dimension()
    )

リクエストにはモデル、埋め込み対象のテキストを指定します。

起動。

$ uvicorn api:app

こちらのAPIを使ってテキスト埋め込みを行います。

Elasticsearchでベクトル検索を行う

それでは、Elasticsearchでベクトル検索を行うプログラムを作成します。

まずはライブラリーのインストール。

$ pip3 install elasticsearch
$ pip3 install pytest

インストールされたライブラリーの一覧。

$ pip3 list
Package           Version
----------------- --------
certifi           2024.6.2
elastic-transport 8.13.1
elasticsearch     8.14.0
exceptiongroup    1.2.1
iniconfig         2.0.0
packaging         24.1
pip               22.0.2
pluggy            1.5.0
pytest            8.2.2
setuptools        59.6.0
tomli             2.0.1
urllib3           2.2.2

pytestをインストールしていますが、ここから先はすべてテストコードとして作成します。

配置場所はこちらで。

$ mkdir tests && touch tests/__init__.py
テキスト埋め込みを行うREST APIにアクセスするクライアントを作成する

まずは作成したテキスト埋め込みを行うREST APIにアクセスする、クライアントプログラムを作成します。

tests/embedding_client.py

import json
import urllib.request

class EmbeddingRequest:
    model: str
    text: str
    normalize: bool

    def __init__(self, model: str, text: str, normalize: bool = False) -> None:
        self.model = model
        self.text = text
        self.normalize = normalize

class EmbeddingResponse:
    model: str
    embeddings: list[float]
    dimension: int

    def __init__(self, model: str, embedding: list[float], dimension: int) -> None:
        self.model = model
        self.embeddings = embedding
        self.dimension = dimension

    @classmethod
    def from_dict(cls, d: dict) -> "EmbeddingResponse":
        return cls(**d)

class EmbeddingClient:
    host: str
    port: int

    def __init__(self, host: str, port: int) -> None:
        self.host = host
        self.port = port

    def execute(self, request: EmbeddingRequest) -> EmbeddingResponse:
        json_request = json.dumps(request.__dict__).encode("utf-8")

        req = urllib.request.Request(f"http://{self.host}:{self.port}/embeddings/encode")
        req.add_header("Content-Type", "application/json")

        with urllib.request.urlopen(req, json_request) as f:
            json_response = json.loads(f.read().decode("utf-8"))

            return EmbeddingResponse.from_dict(json_response)

リクエストとレスポンスのモデルはREST APIを作成した時とほぼ同じですが、辞書との変換処理をつけています。

class EmbeddingRequest:
    model: str
    text: str
    normalize: bool

    def __init__(self, model: str, text: str, normalize: bool = False) -> None:
        self.model = model
        self.text = text
        self.normalize = normalize

class EmbeddingResponse:
    model: str
    embeddings: list[float]
    dimension: int

    def __init__(self, model: str, embedding: list[float], dimension: int) -> None:
        self.model = model
        self.embeddings = embedding
        self.dimension = dimension

    @classmethod
    def from_dict(cls, d: dict) -> "EmbeddingResponse":
        return cls(**d)

REST APIを呼び出しているのはこちら。

    def execute(self, request: EmbeddingRequest) -> EmbeddingResponse:
        json_request = json.dumps(request.__dict__).encode("utf-8")

        req = urllib.request.Request(f"http://{self.host}:{self.port}/embeddings/encode")
        req.add_header("Content-Type", "application/json")

        with urllib.request.urlopen(req, json_request) as f:
            json_response = json.loads(f.read().decode("utf-8"))

            return EmbeddingResponse.from_dict(json_response)

テストも書いておきましょう。

tests/test_embedding_client.py

from tests.embedding_client import EmbeddingClient, EmbeddingRequest


def test_embeddings_client() -> None:
    client = EmbeddingClient("localhost", 8000)

    request = EmbeddingRequest(model="intfloat/multilingual-e5-base", text="query: Hello World")
    response = client.execute(request)

    assert response.model == "intfloat/multilingual-e5-base"
    assert len(response.embeddings) == 768
    assert response.dimension == 768

使用するモデルはintfloat/multilingual-e5-baseとしていますが、Elasticsearchを使ったベクトル検索時もこちらを使うことにします。

intfloat/multilingual-e5-base · Hugging Face

Elasticsearchの検索APIを呼び出す

それでは、Elasticsearchの検索APIを呼び出すテストコードを書いていきます。

使用するElasticsearchのAPIはこちらです。

Search API | Elasticsearch Guide [8.14] | Elastic

knnパラメーターでkNN検索(ANN)を行います。

以降、テストコードを準備書いていきます。最初はElasticsearchへアクセスするクライアントのインスタンス、インデックス名、
テキスト埋め込みのREST APIにアクセスするクライアントのインスタンスを作成。

tests/test_elasticsearch_knn.py

from tests.embedding_client import EmbeddingClient, EmbeddingRequest

from elasticsearch import Elasticsearch, NotFoundError
import pytest

client = Elasticsearch("http://192.168.33.10:9200")

index_name = "my_index"

embedding_http_client = EmbeddingClient("localhost", 8000)

次に登録するドキュメントに相当するクラスを作成します。お題は映画にしました。

class Movie:
    name: str
    description: str
    author: str
    year: int

    def __init__(self, name: str, description: str, author: str, year: int) -> None:
        self.name = name
        self.description = description
        self.author = author
        self.year = year

    def to_dict_with_vector(self) -> dict:
        d = self.__dict__
        d["description_vector"] = to_vector(f"passage: {self.description}")
        return d

インスタンスの内容を辞書に変換し、さらにこの時にテキスト埋め込みの結果を追加します。

passage:というのはintfloat/multilingual-e5でドキュメントに指定する接頭辞です。

テキスト埋め込みを行う処理はこちら。

def to_vector(text: str) -> list[float]:
    request = EmbeddingRequest(model="intfloat/multilingual-e5-base", text=text)
    response = embedding_http_client.execute(request)
    return response.embeddings

先ほどのテストで使ったように、使用するモデルはintfloat/multilingual-e5-baseとしています。

intfloat/multilingual-e5-base · Hugging Face

テストの開始時にインデックスを削除・作成するようにします。この時に、インデックスのマッピングも定義します。

@pytest.fixture
def setup() -> None:
    try:
        client.indices.exists(index=index_name)
        client.indices.delete(index=index_name)
    except NotFoundError:
        pass

    mappings = {
        "properties": {
            "description_vector": {
                "type": "dense_vector",
                "dims": 768,
                "index": True,
                "similarity": "l2_norm",
                "index_options": {
                    "type": "int8_hnsw",
                    "m": 16,
                    "ef_construction": 100
                }
            }
        }
    }

    client.indices.create(index=index_name, mappings=mappings)

こちらに沿ってdense_vector型のフィールドを定義しています。

Dense vector field type | Elasticsearch Guide [8.14] | Elastic

次元数は768、ベクトル類似度関数はユークリッド距離、index_optionsはデフォルト値ですが明示的に指定しています。マッピングの
定義はこちらも参考に。

k-nearest neighbor (kNN) search | Elasticsearch Guide [8.14] | Elastic

登録するデータです。これはQdrantのチュートリアルを元にしています。

Semantic Search 101 - Qdrant

def create_movies() -> list[Movie]:
    return [
           Movie(
               "The Time Machine",
               "A man travels through time and witnesses the evolution of humanity.",
               "H.G. Wells",
               1895
           ),
        Movie(
            "Ender's Game",
            "A young boy is trained to become a military leader in a war against an alien race.",
            "Orson Scott Card",
            1985
        ),
        Movie(
            "Brave New World",
            "A dystopian society where people are genetically engineered and conditioned to conform to a strict social hierarchy.",
            "Aldous Huxley",
            1932
        ),
        Movie(
            "The Hitchhiker's Guide to the Galaxy",
            "A comedic science fiction series following the misadventures of an unwitting human and his alien friend.",
            "Douglas Adams",
            1979
        ),
        Movie(
            "Dune",
            "A desert planet is the site of political intrigue and power struggles.",
            "Frank Herbert",
            1965
        ),
        Movie(
            "Foundation",
            "A mathematician develops a science to predict the future of humanity and works to save civilization from collapse.",
            "Isaac Asimov",
            1951
        ),
        Movie(
            "Snow Crash",
            "A futuristic world where the internet has evolved into a virtual reality metaverse.",
            "Neal Stephenson",
            1992
        ),
        Movie(
            "Neuromancer",
            "A hacker is hired to pull off a near-impossible hack and gets pulled into a web of intrigue.",
            "William Gibson",
            1984
        ),
        Movie(
            "The War of the Worlds",
            "A Martian invasion of Earth throws humanity into chaos.",
            "H.G. Wells",
            1898
        ),
        Movie(
            "The Hunger Games",
            "A dystopian society where teenagers are forced to fight to the death in a televised spectacle.",
            "Suzanne Collins",
            2008
        ),
        Movie(
            "The Andromeda Strain",
            "A deadly virus from outer space threatens to wipe out humanity.",
            "Michael Crichton",
            1969
        ),
        Movie(
            "The Left Hand of Darkness",
            "A human ambassador is sent to a planet where the inhabitants are genderless and can change gender at will.",
            "Ursula K. Le Guin",
            1969
        ),
        Movie(
            "The Three-Body Problem",
            "Humans encounter an alien civilization that lives in a dying system.",
            "Liu Cixin",
            2008
        )
    ]

それでは、ANNを使って検索を行います。

def test_ann_search(setup):
    movies = create_movies()

    for movie in movies:
        document = movie.to_dict_with_vector()
        client.index(index=index_name, document=document)

    results = client.search(
        index=index_name,
        size=3,
        knn={
            "field": "description_vector",
            "query_vector": to_vector(f"query: alien invasion"),
            "k": len(movies)  ## all documents
        }
    )

    hits = results["hits"]["hits"]

    assert len(hits) == 3
    assert hits[0]["_source"]["name"] == "The Hitchhiker's Guide to the Galaxy"
    assert hits[0]["_source"]["year"] == 1979
    assert hits[1]["_source"]["name"] == "The Three-Body Problem"
    assert hits[1]["_source"]["year"] == 2008
    assert hits[2]["_source"]["name"] == "The Andromeda Strain"
    assert hits[2]["_source"]["year"] == 1969

Elasticsearchの検索APIはこちらで、

Search API | Elasticsearch Guide [8.14] | Elastic

対応するElasticsearchのPythonクライアントでのメソッドはこちら。

Elasticsearch#search

ANNに関するパラメーターは、knnで指定している箇所ですね。fieldは検索対象とするフィールド、query_vectorは検索で使用する
ベクトル、kは近似の際に検索対象とするドキュメントの数です。

        knn={
            "field": "description_vector",
            "query_vector": to_vector(f"query: alien invasion"),
            "k": len(movies)  ## all documents
        }

k-nearest neighbor (kNN) search / Approximate kNN

query:はintfloat/multilingual-e5で検索時に使う接頭辞です。

ところで、Apache Luceneを直接使った時は指定するkの値がドキュメント数と同じかどうかでkNN検索とANNを使い分けるようになって
いました。

Apache LuceneでkNN検索とANN+HNSWを使い分ける(Codecを使ったHNSWのパラメーター設定付き) - CLOVER🍀

Elasticsearchの場合、どうなっているかもうわからないのでkをドキュメント数と同じにしてもANNと捉えておきます…。

そもそも正確なkNN検索をしたい場合は、全然違う方法が提示されていますからね。

k-nearest neighbor (kNN) search / Exact kNN

ところで、ElasticsearchのPythonクライアントのドキュメントを見ると、Elasticsearch#knn_searchというものがあって、これは
なんだろう?と思っていたのですが。

Elasticsearch#knn_search

指定できるパラメーターも中途半端でどうなっているのかな?と見ていたのですが、Elasticsearch側で非推奨のAPIになっているみたいなので
気にしなくてよさそうです。

kNN search API | Elasticsearch Guide [8.14] | Elastic

最後にフィルターもつけてみましょう。

def test_ann_search_with_filter(setup):
    movies = create_movies()

    for movie in movies:
        document = movie.to_dict_with_vector()
        client.index(index=index_name, document=document)

    results = client.search(
        index=index_name,
        size=3,
        knn={
            "field": "description_vector",
            "query_vector": to_vector(f"query: alien invasion"),
            "k": len(movies),  ## all documents
            "filter": {
                "query_string": {
                    "query": "year: [2000 TO *]"
                }
            }
        }
    )

    hits = results["hits"]["hits"]

    assert len(hits) == 2
    assert hits[0]["_source"]["name"] == "The Three-Body Problem"
    assert hits[0]["_source"]["year"] == 2008
    assert hits[1]["_source"]["name"] == "The Hunger Games"
    assert hits[1]["_source"]["year"] == 2008

こんなところでしょうか。

おわりに

ElasticsearchでkNN検索(ANN)を試してみました。

Apache Luceneがベースなので、前に試していた分だけ使い方はイメージしやすかったのですが、Elasticsearch固有のパラメーターなどが
あって少し異なる印象を受けました。単純にApache Luceneと同じ、というわけにはいかなさそうです。

今回はANNのみとしましたが、完全なkNN検索やハイブリッド検索などいろいろ試してみたいものがあるなとも思うのですが、
それは気が向いたらまたということで。

それにしても、Python慣れしていないのでプログラムを書くのにとても苦労しました…。