Elasticsearch は、Apache Lucene をベースにしたオープンソースの検索および分析エンジンです。Elasticsearch を使用して変更データ キャプチャ (CDC) データに基づいてアプリケーションを構築する場合は、インデックス内の既存のドキュメントに対する頻繁な更新や変更を処理できるようにシステムを設計する必要があります。
このブログでは、完全更新、部分更新、スクリプト更新など、更新に使用できるさまざまなオプションについて説明します。また、ドキュメントを変更するときにElasticsearchの内部で何が起きるか、頻繁な更新がシステムの CPU 使用率にどのような影響を与えるかについても説明します。
頻繁に更新されるユースケースをよりよく理解するために、Netflix などのビデオ ストリーミング サービスの検索アプリケーションを見てみましょう。ユーザーが「政治スリラー」などの番組を検索すると、キーワードやその他のメタデータに基づいて関連する結果のセットが返されます。
番組「ハウス・オブ・カード」の Elasticsearch のサンプルドキュメントを見てみましょう。
{ "name": "House of Cards", "description": "Frank Underwood is a Democrat appointed as the Secretary of State. Along with his wife, he sets out on a quest to seek revenge from the people who betrayed him while successfully rising to supremacy.", "genres": ["drama", "thriller"], "views": 100, }
Elasticsearch では、 name
とdescription
全文検索フィールドとして使用するように検索を設定できます。タイトルごとの視聴回数を保存するviews
フィールドは、コンテンツを強化し、より人気のある番組を上位にランク付けするために使用できますviews
フィールドは、ユーザーが番組や映画のエピソードを視聴するたびに増加します。
この検索構成をNetflix規模のアプリケーションで使用すると、実行される更新の数はNetflix エンゲージメント レポートで測定されたとおり、1 分あたり数百万を簡単に超える可能性があります。レポートによると、ユーザーは 1 月から 7 月にかけて約 1,000 億時間のコンテンツを視聴しました。エピソードまたは映画あたりの平均視聴時間を 15 分と仮定すると、1 分あたりの視聴回数は平均 130 万回に達します。上記の検索構成では、各視聴で数百万規模の更新が必要になります。
多くの検索および分析アプリケーションは、特に CDC データに基づいて構築されている場合、頻繁に更新される可能性があります。
以下のコードを使用して、Elasticsearch で更新を実行する方法の一般的な例を詳しく見てみましょう。
- from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID you want to update index_name = 'movies' document_id = 'your_document_id' # Retrieve the current document to get the current 'views' value try: current_doc = es.get(index=index_name, id=document_id) current_views = current_doc['_source']['views'] except Exception as e: print(f"Error retrieving current document: {e}") current_views = 0 # Set a default value if there's an error # Define the update body to increment 'views' by 1 update_body = { "doc": { "views": current_views + 1 # Increment 'views' by 1 } } # Perform the update try: es.update(index=index_name, id=document_id, body=update_body) print("Document updated successfully!") except Exception as e: print(f"Error updating document: {e}")
Elasticsearch で更新を実行する場合、インデックス APIを使用して既存のドキュメントを置き換えたり、更新 API を使用してドキュメントを部分的に更新したりできます。
インデックス API はドキュメント全体を取得し、ドキュメントに変更を加えてから、ドキュメントのインデックスを再作成します。更新 API では、ドキュメント全体ではなく、変更するフィールドのみを送信します。これにより、ドキュメントのインデックスが再作成されますが、ネットワーク経由で送信されるデータの量は最小限に抑えられます。更新 API は、ドキュメントのサイズが大きく、ドキュメント全体をネットワーク経由で送信すると時間がかかる場合に特に便利です。
Python コードを使用して、インデックス API と更新 API の両方がどのように機能するかを見てみましょう。
from elasticsearch import Elasticsearch # Connect to Elasticsearch es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID index_name = "your_index" document_id = "1" # Retrieve the existing document existing_document = es.get(index=index_name, id=document_id) # Make your changes to the document existing_document["_source"]["field1"] = "new_value1" existing_document["_source"]["field2"] = "new_value2" # Call the index API to perform the full update es.index(index=index_name, id=document_id, body=existing_document["_source"])
上記のコードからわかるように、インデックス API では Elasticsearch への 2 つの個別の呼び出しが必要であり、これによりパフォーマンスが低下し、クラスターの負荷が高くなる可能性があります。
部分的な更新では内部的にreindex API が使用されますが、パフォーマンスを向上させるために 1 回のネットワーク呼び出しのみを必要とするように構成されています。
from elasticsearch import Elasticsearch # Connect to Elasticsearch es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID index_name = "your_index" document_id = "1" # Specify the fields to be updated update_fields = { "field1": "new_value1", "field2": "new_value2" } # Use the update API to perform a partial update es.update(index=index_name, id=document_id, body={"doc": update_fields})
Elasticsearch の更新 API を使用してビュー数を更新できますが、更新 API 自体では、以前の値に基づいてビュー数を増分するために使用することはできません。これは、新しいビュー数の値を設定するには、古いビュー数が必要であるためです。
強力なスクリプト言語である Painless を使用してこれを修正する方法を見てみましょう。
Painlessは Elasticsearch 用に設計されたスクリプト言語で、クエリや集計の計算、複雑な条件、データ変換などに使用できます。また、Painless では、更新クエリでスクリプトを使用して、複雑なロジックに基づいてドキュメントを変更することもできます。
以下の例では、Painless スクリプトを使用して、1 回の API 呼び出しで更新を実行し、古いビュー数の値に基づいて新しいビュー数を増やします。
from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID you want to update index_name = 'movies' document_id = 'your_document_id' # Define the Painless script for the update update_script = { "script": { "lang": "painless", "source": "ctx._source.views += 1" # Increment 'views' by 1 } } # Perform the update using the Painless script try: es.update(index=index_name, id=document_id, body=update_script) print("Document updated successfully!") except Exception as e: print(f"Error updating document: {e}")
Painless スクリプトは非常に直感的に理解でき、ドキュメントごとにビュー数を 1 ずつ増やすだけです。
Elasticsearch のネストされたオブジェクトは、単一の親ドキュメント内の個別のドキュメントとしてオブジェクトの配列のインデックス作成を可能にするデータ構造です。ネストされたオブジェクトは、オブジェクト内のオブジェクトなど、自然にネストされた構造を形成する複雑なデータを処理する場合に便利です。一般的な Elasticsearch ドキュメントでは、オブジェクトの配列はフラット化されていますが、ネストされたデータ型を使用すると、配列内の各オブジェクトを個別にインデックス作成およびクエリできます。
簡単なスクリプトを使用して、Elasticsearch 内のネストされたオブジェクトを更新することもできます。
from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID for the example index_name = 'your_index' document_id = 'your_document_id' # Specify the nested field and the updated value nested_field = "nested_field_name" updated_value = "new_value" # Define the Painless script for the update update_script = { "script": { "lang": "painless", "source": "ctx._source.nested_field_name = params.updated_value", "params": { "updated_value": updated_value } } } # Perform the update using the Update API and the Painless script try: es.update(index=index_name, id=document_id, body=update_script) print("Nested object updated successfully!") except Exception as e: print(f"Error updating nested object: {e}")
Elasticsearch のドキュメントに新しいフィールドを追加するには、インデックス操作を実行します。
Update API を使用すると、既存のドキュメントを新しいフィールドで部分的に更新できます。インデックスの動的マッピングが有効になっている場合、新しいフィールドの導入は簡単です。そのフィールドを含むドキュメントをインデックスするだけで、Elasticsearch は適切なマッピングを自動的に判断し、新しいフィールドをマッピングに追加します。
インデックスの動的マッピングが無効になっている場合は、更新マッピング API を使用する必要があります。映画インデックスに「カテゴリ」フィールドを追加してインデックス マッピングを更新する方法の例を以下に示します。
PUT /movies/_mapping { "properties": { "category": { "type": "keyword" } } }
コードはシンプルですが、データは不変のセグメントに保存されるため、Elasticsearch は内部的にこれらの更新を実行するために多くの負荷をかけています。その結果、Elasticsearch はドキュメントをその場で更新することができません。更新を実行する唯一の方法は、どの API が使用されているかに関係なく、ドキュメント全体を再インデックスすることです。
Elasticsearch は、内部で Apache Lucene を使用しています。Lucene インデックスは、1 つ以上のセグメントで構成されます。セグメントは、インデックス全体のサブセットを表す自己完結型の不変のインデックス構造です。ドキュメントが追加または更新されると、新しい Lucene セグメントが作成され、古いドキュメントはソフト削除対象としてマークされます。時間の経過とともに、新しいドキュメントが追加されたり、既存のドキュメントが更新されたりすると、複数のセグメントが蓄積されることがあります。インデックス構造を最適化するために、Lucene は定期的に小さなセグメントを大きなセグメントにマージします。
各更新操作は再インデックス操作であるため、すべての更新は基本的にソフト削除を伴う挿入になります。
更新を挿入操作として扱うと、コストに影響があります。一方では、データのソフト削除は、古いデータがしばらく保持され続けることを意味し、インデックスのストレージとメモリが膨張します。ソフト削除、再インデックス、ガベージ コレクション操作を実行すると、CPU に大きな負担がかかり、すべてのレプリカでこれらの操作を繰り返すと、その負担はさらに大きくなります。
製品が成長し、データが時間とともに変化すると、更新はより複雑になります。Elasticsearch のパフォーマンスを維持するには、クラスター内のシャード、アナライザー、トークナイザーを更新し、クラスター全体のインデックスを再作成する必要があります。実稼働アプリケーションの場合、新しいクラスターをセットアップしてすべてのデータを移行する必要があります。クラスターの移行は時間がかかり、エラーが発生しやすいため、軽視できる操作ではありません。
Elasticsearch の更新操作はシンプルなため、システムの内部で実行されている負荷の高い操作タスクが見えにくくなることがあります。Elasticsearch は各更新を挿入として扱うため、ドキュメント全体を再作成して再インデックスする必要があります。更新が頻繁に行われるアプリケーションの場合、毎分数百万件の更新が行われる Netflix の例でわかるように、これはすぐにコストが高くなる可能性があります。Elasticsearch で更新が頻繁に行われる場合は、 Bulk APIを使用して更新をバッチ処理するか (ワークロードに遅延が追加されます)、代替ソリューションを検討することをお勧めします。
Rockset はクラウド上に構築された検索および分析データベースで、Elasticsearch の代替として変更可能です。変更可能であることで人気のキーバリューストアRocksDB上に構築された Rockset は、ドキュメントをその場で更新できます。これにより、ドキュメント全体ではなく、個々のフィールドの値のみが更新され、再インデックスされます。
更新の多いワークロードについて Elasticsearch と Rockset のパフォーマンスを比較したい場合は、300 ドルのクレジットでRockset の無料トライアルを開始できます。