PostgreSQL、MongoDB、DynamoDB などのソース システムから下流のシステムにストリーミング データを管理し、リアルタイム検索や分析を行うことは、多くのチームにとって課題です。データ フローには複雑な ETL ツールや自己管理型の統合が含まれることが多く、更新や削除などの大量の書き込みによって CPU が消費されたり、エンド アプリケーションのパフォーマンスに影響が及んだりしないようにする必要があります。
Elasticsearchのようなシステムでは、エンジニアはストリーミング データを効率的に取り込むために、基礎となるアーキテクチャに関する詳細な知識を持っている必要があります。Elasticsearch は、データが頻繁に変更されないログ分析用に設計されているため、トランザクション データを処理する際には追加の課題が生じます。
一方、Rockset はクラウドネイティブ データベースであり、システムにデータを取り込むために必要なツールやオーバーヘッドの多くを排除します。Rockset はリアルタイム検索と分析を目的として構築されているため、フィールド レベルの可変性も考慮して設計されており、挿入、更新、削除の処理に必要な CPU を削減します。
このブログでは、 Elasticsearch と Rockset がデータ取り込みをどのように処理するかを比較対照し、これらのシステムをリアルタイム分析に使用するための実用的なテクニックを紹介します。
Elasticsearch にデータを取り込む方法は多数ありますが、ここではリアルタイム検索と分析のための一般的な 3 つの方法について説明します。
Logstash JDBC 入力プラグインを使用してリレーショナル データベースから Elasticsearch にデータを取り込む
Kafka Elasticsearch Service Sink Connector を使用して、Kafka から Elasticsearch にデータを取り込む
REST APIとクライアントライブラリを使用して、アプリケーションからElasticsearchに直接データを取り込む
Logstash JDBC 入力プラグインを使用して、リレーショナル データベースから Elasticsearch にデータを取り込みます。Logstash JDBC 入力プラグインを使用すると、 PostgreSQL や MySQL などのリレーショナル データベースから Elasticsearch にデータをオフロードして、検索や分析を行うことができます。
Logstash は、データを Elasticsearch に送信する前に取り込んで変換するイベント処理パイプラインです。Logstash は、PostgreSQL や MySQL などのリレーショナル データベースを定期的にポーリングして挿入と更新を行うJDBC 入力プラグインを提供します。このサービスを使用するには、リレーショナル データベースが、Logstash が読み取り、発生した変更を判断できるタイムスタンプ付きのレコードを提供する必要があります。
この取り込みアプローチは挿入と更新には適していますが、削除には追加の考慮が必要です。これは、Logstash が OLTP データベースで何が削除されたかを判断することができないためです。ユーザーは、ソフト削除を実装することでこの制限を回避できます。ソフト削除では、削除されたレコードにフラグが適用され、クエリ時にそのフラグを使用してデータがフィルターされます。または、リレーショナル データベースを定期的にスキャンして最新のレコードにアクセスし、Elasticsearch でデータを再インデックス化することもできます。
Kafka Elasticsearch Sink Connectorを使用して、Kafka から Elasticsearch にデータを取り込みます。また、リアルタイムの検索と分析のために、Kafka などのイベント ストリーミング プラットフォームを使用してソース システムから Elasticsearch にデータを送信することも一般的です。
Confluent と Elastic は、マネージド Confluent Kafka と Elastic Elasticsearch の両方を使用している企業が利用できるKafka Elasticsearch Service Sink Connectorのリリースで提携しました。コネクタを使用するには、追加のツールである Kafka Connect をインストールして管理する必要があります。
コネクタを使用すると、Kafka の各トピックを Elasticsearch の単一のインデックス タイプにマップできます。インデックス タイプとして動的型付けを使用する場合、Elasticsearch はフィールドの追加、フィールドの削除、タイプの変更などの一部のスキーマ変更をサポートします。
Kafka を使用する際に発生する課題の 1 つは、アナライザー、トークナイザー、またはインデックス付けされたフィールドを変更するときに、Elasticsearch でデータを再インデックスする必要があることです。これは、マッピングが定義されると変更できないためです。データの再インデックスを実行するには、元のインデックスと新しいインデックスに二重に書き込み、元のインデックスから新しいインデックスにデータを移動してから、元のコネクタ ジョブを停止する必要があります。
Confluent または Elastic のマネージド サービスを使用しない場合は、Logstash 用のオープンソース Kafka プラグインを使用してデータを Elasticsearch に送信できます。
REST API とクライアント ライブラリを使用して、アプリケーションから Elasticsearch に直接データを取り込みます。Elasticsearch では、Java、Javascript、Ruby、Go、Python などのサポートされているクライアント ライブラリを使用して、アプリケーションから REST API 経由でデータを直接取り込むことができます。クライアント ライブラリを使用する際の課題の 1 つは、Elasticsearch が取り込み負荷を処理できない場合に備えて、キューイングとバック プレッシャーを使用するように構成する必要があることです。キューイング システムが導入されていないと、Elasticsearch へのデータ損失が発生する可能性があります。
Elasticsearch には、更新と削除の処理に使用できるUpdate APIがあります。Update API は、ネットワーク トリップの数とバージョン競合の可能性を減らします。Update API は、インデックスから既存のドキュメントを取得し、変更を処理してから、データを再度インデックスします。ただし、Elasticsearch はインプレース更新や削除を提供していません。そのため、ドキュメント全体を再度インデックスする必要があり、CPU を集中的に使用する操作になります。
内部的には、Elasticsearch データは Lucene インデックスに保存され、そのインデックスは小さなセグメントに分割されます。各セグメントは不変であるため、ドキュメントを変更することはできません。更新が行われると、古いドキュメントは削除対象としてマークされ、新しいドキュメントがマージされて新しいセグメントが形成されます。更新されたドキュメントを使用するには、すべてのアナライザーを実行する必要があり、CPU 使用率が増加する可能性があります。常に変化するデータを持つ顧客にとって、インデックスのマージが Elasticsearch の全体的なコンピューティング料金のかなりの部分を消費することはよくあります。
必要なリソースの量を考慮すると、Elasticsearch への更新回数を制限することを Elastic は推奨しています。Elasticsearch のリファレンス カスタマーであるBol.comは、自社の e コマース プラットフォームの一部として、サイト検索に Elasticsearch を使用していました。Bol.com では、コンテンツ、価格、在庫状況の変更など、1 日あたり約 70 万件の更新が行われていました。当初、同社は変更が発生するたびに同期を維持するソリューションを求めていました。しかし、更新が Elasticsearch システムのパフォーマンスに与える影響を考慮して、15 ~ 20 分の遅延を許容することにしました。Elasticsearch へのドキュメントのバッチ処理により、一貫したクエリ パフォーマンスが確保されました。
Elasticsearch では、古いドキュメントの削除やスペースの再利用に関連する課題が発生する可能性があります。
Elasticsearch は、インデックスに多数のセグメントがある場合、またはセグメント内に削除対象としてマークされたドキュメントが多数ある場合に、バックグラウンドでセグメントのマージを実行します。セグメントのマージとは、既存のセグメントから新しく形成されたセグメントにドキュメントをコピーし、残りのセグメントを削除することです。残念ながら、Lucene はマージする必要があるセグメントのサイズ設定が得意ではないため、パフォーマンスと安定性に影響を与える不均一なセグメントを作成する可能性があります。
これは、Elasticsearch がすべてのドキュメントのサイズが均一であると想定し、削除されたドキュメントの数に基づいてマージの決定を行うためです。マルチテナント アプリケーションでよくあるように、ドキュメントのサイズが異なる場合、一部のセグメントのサイズが他のセグメントよりも速く大きくなり、アプリケーションの最大の顧客のパフォーマンスが低下します。このような場合、唯一の解決策は大量のデータを再インデックスすることです。
Elasticsearch は、レプリケーションにプライマリ バックアップ モデルを使用します。プライマリ レプリカは、受信した書き込み操作を処理し、その操作をレプリカに転送します。各レプリカはこの操作を受け取り、データをローカルで再度再インデックスします。つまり、すべてのレプリカが独立して、同じドキュメントを何度も再インデックスするために、コストのかかるコンピューティング リソースを費やします。レプリカが n 個ある場合、Elastic は同じドキュメントをインデックスするために n 倍の CPU を費やします。これにより、更新または挿入が発生したときに再インデックスする必要があるデータの量が増大する可能性があります。
Elasticsearch では Update API を使用できますが、一般的にはBulk APIを使用して頻繁な変更をバッチ処理することをお勧めします。Bulk API を使用する場合、エンジニアリング チームはシステムへの更新を効率化するためにキューを作成して管理する必要があることがよくあります。
キューは Elasticsearch から独立しており、設定と管理が必要です。キューは、Elasticsearch への影響を制限するために、特定の時間間隔 (たとえば 15 分) 内にシステムへの挿入、更新、削除を統合します。キューイング システムは、挿入率が高い場合にスロットルを適用して、アプリケーションの安定性を確保します。キューは更新には役立ちますが、データの完全な再インデックスを必要とするデータ変更が大量にあるかどうかを判断するのは得意ではありません。システムへの更新が大量にある場合、これはいつでも発生する可能性があります。Elastic を大規模に実行するチームでは、専任の運用メンバーがキューを毎日管理および調整するのが一般的です。
前のセクションで述べたように、大量の更新があったり、インデックス マッピングを変更する必要がある場合は、データの再インデックスが行われます。 再インデックスはエラーが発生しやすく、クラスターがダウンする可能性があります。さらに恐ろしいのは、再インデックスはいつでも発生する可能性があることです。
マッピングを変更したい場合は、再インデックスが行われる時間をさらに制御できます。Elasticsearch には、新しいインデックスを作成するための再インデックス API と、新しいインデックスの作成時にダウンタイムが発生しないようにするためのエイリアス API があります。エイリアス API を使用すると、新しいインデックスの作成時にクエリがエイリアスまたは古いインデックスにルーティングされます。新しいインデックスの準備ができると、エイリアス API は新しいインデックスからデータを読み取ります。
エイリアス API では、新しいインデックスを最新のデータと同期させるのが依然として困難です。これは、Elasticsearch が 1 つのインデックスにしかデータを書き込めないためです。そのため、新しいインデックスと古いインデックスに二重に書き込むように、データ パイプラインの上流を構成する必要があります。
Rockset は組み込みのコネクタを使用して、データをソース システムと同期させます。Rockset のマネージド コネクタは、データ ソースの種類ごとに調整されているため、2 秒以内にデータを取り込んでクエリ可能にすることができます。これにより、遅延を追加したり、15 分ごとにマイクロバッチでしかデータを取り込めない手動パイプラインを回避できます。
大まかに言うと、Rockset は OLTP データベース、データ ストリーム、データ レイク、データ ウェアハウスへの組み込みコネクタを提供します。その仕組みは次のとおりです。
OLTP データベースへの組み込みコネクタRockset は、OLTP データベース内のテーブルの初期スキャンを実行し、 CDC ストリームを使用して最新のデータとの同期を維持し、ソース システムによって生成されてから 2 秒以内にデータをクエリできるようになります。
データ ストリームへの組み込みコネクタKafkaや Kinesis などのデータ ストリームを使用すると、Rockset は、Kafka や Kinesis での調整を必要としないプルベースの統合を使用して、新しいトピックを継続的に取り込みます。
データ レイクとウェアハウスへの組み込みコネクタRockset は、更新を常に監視し、S3 バケットなどのデータ レイクから新しいオブジェクトを取り込み、リアルタイム分析を行うために、チームがリアルタイム ストリームをデータ レイクのデータと結合したいと考えることがよくあります。
Rockset は、複数のマシン間でデータを並列に効率的にインデックス化するように最適化された分散アーキテクチャを備えています。
Rockset はドキュメント シャード データベースであるため、ドキュメントを分割してさまざまなフィールドを異なるマシンに送信するのではなく、ドキュメント全体を 1 台のマシンに書き込みます。このため、更新と削除の主キー _id に基づいて、挿入用に新しいドキュメントを追加したり、既存のドキュメントを検索したりすることがすばやくできます。
Elasticsearch と同様に、Rockset はクエリ時にインデックスを使用してデータを迅速かつ効率的に取得します。ただし、他のデータベースや検索エンジンとは異なり、Rockset は取り込み時にデータを統合インデックスにインデックスします。統合インデックスは、列ストア、検索インデックス、行ストアを組み合わせたインデックスです。統合インデックスは、フィールド内のすべての値を一連のキーと値のペアとして保存します。以下の例では、ドキュメントとそれが Rockset にどのように保存されるかを確認できます。
Rockset は、内部的には RocksDB を使用します。これは、変更が簡単になる高性能なキー値ストアです。RocksDB は、異なるキー間でのアトミック書き込みと削除をサポートします。ドキュメントのname
フィールドに更新があった場合、インデックスごとに 1 つずつ、正確に 3 つのキーを更新する必要があります。ドキュメント内の他のフィールドのインデックスは影響を受けません。つまり、Rockset は、ドキュメント全体のインデックスを毎回更新するサイクルを無駄にすることなく、更新を効率的に処理できます。
ネストされたドキュメントと配列も Rockset のファーストクラスのデータ型であるため、同じ更新プロセスがそれらにも適用され、JSON や Avro などの最新の形式で保存されたデータの更新に Rockset は適しています。
Rockset のチームは、リアルタイム分析ワークロードで一般的なパターンである大量の書き込みと大量の読み取りを処理するために、RocksDB 用のカスタム拡張機能もいくつか構築しました。これらの拡張機能の 1 つは、クエリ コンピューティングとインデックス作成コンピューティングを RocksDB Cloud に明確に分離するリモート コンパクションです。これにより、Rockset は書き込みが読み取りに干渉することを回避できます。これらの機能強化により、Rockset は顧客のニーズに応じて書き込みをスケーリングし、バックグラウンドで変更が発生している場合でも最新のデータをクエリに使用できるようにすることができます。
Rockset のユーザーは、デフォルトの _id フィールドを使用するか、特定のフィールドを主キーとして指定することができます。このフィールドにより、ドキュメント全体またはドキュメントの一部を上書きすることができます。Rockset と Elasticsearch の違いは、Rockset ではドキュメント全体を再インデックスしなくても、個々のフィールドの値を更新できることです。
Rockset API を使用してコレクション内の既存のドキュメントを更新するには、Patch Documents エンドポイントにリクエストを送信します。更新する既存のドキュメントごとに、_id フィールドと、ドキュメントに適用するパッチ操作のリストを指定するだけです。
Rockset API は、アプリケーション コードからコレクションに直接データを挿入できるように、ドキュメントの追加エンドポイントも公開します。既存のドキュメントを削除するには、削除するドキュメントの _id フィールドを指定して、Rockset API のドキュメントの削除エンドポイントにリクエストを送信するだけです。
Elasticsearch とは異なり、Rockset では 1 つのレプリカのみが RocksDB リモート コンパクションを使用してインデックス作成とコンパクションを実行します。これにより、特に耐久性のために複数のレプリカが使用されている場合に、インデックス作成に必要な CPU の量が削減されます。
Rockset での取り込み時に、取り込み変換を使用して、生のソース データに適用する必要なデータ変換を指定できます。後日、取り込み変換を変更する場合は、データを再インデックスする必要があります。
とはいえ、Rockset はスキーマレスな取り込みを可能にし、データのすべてのフィールドの値を動的に型付けします。データやクエリのサイズや形状が変わっても、Rockset はパフォーマンスを維持し、データの再インデックスを必要としません。
Rockset は、再インデックス化を必要とせずに、数百テラバイトのデータに拡張できます。これは Rockset のシャーディング戦略に由来しています。顧客が仮想インスタンスに割り当てるコンピューティングが増加すると、シャードのサブセットがシャッフルされ、クラスター全体でより適切に分散され、より並列化され、より高速なインデックス化とクエリ実行が可能になります。その結果、これらのシナリオでは再インデックス化を行う必要がありません。
Elasticsearch は、データが頻繁に更新、挿入、削除されないログ分析用に設計されました。時間の経過とともに、チームは Elasticsearch の使用範囲を拡大し、頻繁に変化するトランザクション データのリアルタイム分析のためのセカンダリ データ ストアおよびインデックス エンジンとして Elasticsearch を使用するようになりました。これは、特にリアルタイムのデータ取り込みを最適化しているチームにとってはコストのかかる取り組みになる可能性があり、また、かなりの管理オーバーヘッドも伴います。
一方、Rockset はリアルタイム分析用に設計されており、新しいデータが生成されてから 2 秒以内にクエリを実行できます。このユースケースを解決するために、Rockset はインプレース挿入、更新、削除をサポートし、コンピューティングを節約し、ドキュメントの再インデックスの使用を制限します。Rockset はコネクタと取り込みの管理オーバーヘッドも認識しており、プラットフォーム アプローチを採用して、リアルタイム コネクタをクラウド サービスに組み込んでいます。
全体的に、リアルタイム分析のためにElasticsearch から Rockset に移行した企業は、コンピューティング料金だけで 44% を節約しています。数日で Elasticsearch から Rockset に切り替えるエンジニアリング チームの波に加わってください。今すぐ無料トライアルを開始してください。