paint-brush
Tencent Music の ClickHouse から Apache Doris への移行@junzhang
2,870 測定値
2,870 測定値

Tencent Music の ClickHouse から Apache Doris への移行

Jun Zhang13m2023/03/14
Read on Terminal Reader

長すぎる; 読むには

Tencent Music は、月間 8 億人のアクティブ ユーザーを持つ音楽ストリーミング サービス プロバイダーです。音楽ライブラリには、録音された音楽、ライブ音楽、オーディオ、ビデオなど、あらゆる形式とタイプのデータが含まれています。データのほとんどを Tencent Data Warehouse (TDW) に保存して処理しました。ClickHouse はフラット テーブルの処理に優れていましたが、すべてのデータをフラットなテーブルに注ぎ込み、それを日ごとに分割するためのストレージ リソースの膨大な浪費。 Apache Doris が解決策でした。
featured image - Tencent Music の ClickHouse から Apache Doris への移行
Jun Zhang HackerNoon profile picture
0-item

この記事は、私と同僚のカイ・ダイの共著です。私たちは 2 人とも Tencent Music (NYSE: TME) のデータ プラットフォーム エンジニアです。Tencent Music は、月間 8 億人のアクティブ ユーザーを持つ音楽ストリーミング サービス プロバイダーです。ここで数値を削除するのは、自慢するためではなく、貧しい同僚と私が毎日取り組まなければならないデータの海を示唆するためです。


ClickHouseの使用目的

Tencent Music の音楽ライブラリには、録音された音楽、ライブ音楽、オーディオ、ビデオなど、あらゆる形式と種類のデータが含まれています。データ プラットフォーム エンジニアとしての私たちの仕事は、データから情報を抽出することです。これに基づいて、チームメイトはより良い決定を下すことができます。ユーザーと音楽パートナーをサポートするため。


具体的には、曲、歌詞、メロディー、アルバム、およびアーティストの総合的な分析を行い、このすべての情報をデータ資産に変換し、インベントリのカウント、ユーザー プロファイリング、指標分析、およびグループ ターゲティングのために内部データ ユーザーに渡します。 .



ほとんどのデータを Tencent Data Warehouse (TDW) に保存して処理しました。TDW はオフライン データ プラットフォームで、データをさまざまなタグやメトリック システムに入れ、各オブジェクト (曲、アーティストなど) を中心としたフラット テーブルを作成しました。


次に、分析のためにフラットテーブルを ClickHouse にインポートし、データ検索とグループターゲティングのために Elasticsearch にインポートしました。


その後、データ アナリストは、必要なタグとメトリックの下でデータを使用して、さまざまな使用シナリオのデータセットを形成し、その間に独自のタグとメトリックを作成できました。


データ処理パイプラインは次のようになります。



ClickHouseの問題


上記のパイプラインを使用しているときに、いくつかの問題が発生しました。


  1. 部分更新: 列の部分更新はサポートされていませんでした。そのため、いずれかのデータ ソースからの遅延により、フラット テーブルの作成が遅れ、データの適時性が損なわれる可能性があります。


  2. 高いストレージ コスト: さまざまなタグとメトリックの下にあるデータが、さまざまな頻度で更新されました。 ClickHouse がフラット テーブルの処理に優れていたのと同じくらい、すべてのデータをフラット テーブルに注ぎ込み、それを日ごとに分割することは、ストレージ リソースの膨大な浪費であり、それに伴うメンテナンス コストは言うまでもありません。


  3. 高い維持費: アーキテクチャ的に言えば、ClickHouse は、ストレージ ノードと計算ノードの強力な結合によって特徴付けられました。そのコンポーネントは相互依存性が高く、クラスターが不安定になるリスクが増していました。さらに、ClickHouse と Elasticsearch にまたがるフェデレーテッド クエリの場合、膨大な量の接続の問題に対処する必要がありました。それはただ退屈でした。


Apache Doris への移行

リアルタイム分析データベースであるApache Doris は、問題を解決するためにまさに必要としていた機能をいくつか備えています。


  1. 部分更新: Doris はさまざまなデータ モデルをサポートしています。その中で、集計モデルは列のリアルタイムの部分更新をサポートしています。これに基づいて、生データを Doris に直接取り込み、そこでフラット テーブルを作成できます。取り込みは次のようになります。まず、Spark を使用してデータを Kafka に読み込みます。その後、増分データは Flink を介して Doris と Elasticsearch に更新されます。その間、Flink は Doris と Elasticsearch の負担を解放するためにデータを事前に集約します。


  2. ストレージ コスト: Doris は、Hive、Iceberg、Hudi、MySQL、Elasticsearch 全体で複数テーブルの結合クエリとフェデレーション クエリをサポートしています。これにより、大きなフラット テーブルを小さなテーブルに分割し、更新頻度で分割することができます。これにより、ストレージの負荷が軽減され、クエリのスループットが向上するという利点があります。


  3. 維持費: Doris はシンプルなアーキテクチャで、MySQL プロトコルと互換性があります。 Doris のデプロイには、他のシステムに依存しない 2 つのプロセス (FE と BE) のみが含まれるため、操作と保守が容易になります。また、Doris は外部 ES データ テーブルのクエリをサポートしています。 ES のメタデータと簡単に連携し、ES からテーブル スキーマを自動的にマッピングできるため、複雑な接続に取り組むことなく、Doris を介して Elasticsearch データに対してクエリを実行できます。


さらに、Doris は、HDFS や S3 などのリモート ストレージからのバッチ インポート、MySQL binlog や Kafka からのデータ読み取り、MySQL、Oracle、および PostgreSQL からのリアルタイム データ同期またはバッチ インポートなど、複数のデータ取り込み方法をサポートしています。整合性プロトコルを通じてサービスの可用性とデータの信頼性を確保し、自動デバッグが可能です。これは、オペレーターとメンテナーにとって朗報です。


統計的に言えば、これらの機能により、ストレージ コストが 42%、開発コストが 40% 削減されました。


Doris を使用している間、オープン ソースの Apache Doris コミュニティから多くのサポートを受け、現在 Apache Doris の商用バージョンを実行している SelectDB チームからタイムリーな支援を受けました。



私たちのニーズに応えるためのさらなる改善

セマンティック レイヤーを導入する

データセットについて言えば、明るい面として、当社のデータ アナリストには、都合のよいときにタグとメトリックを再定義して組み合わせる自由が与えられています。しかし、暗い面では、タグとメトリック システムの異質性が高いため、それらの使用と管理がより困難になります。


私たちの解決策は、データ処理パイプラインにセマンティック レイヤーを導入することです。セマンティック レイヤーは、すべての技術用語が内部データ ユーザーにとってよりわかりやすい概念に変換される場所です。言い換えれば、タグとメトリックを、データの定義と管理のための第一級の市民に変えています。



なぜこれが役立つのでしょうか?


データ アナリストの場合、すべてのタグとメトリックがセマンティック レイヤーで作成および共有されるため、混乱が少なくなり、効率が向上します。


データ ユーザーは、独自のデータセットを作成したり、シナリオごとに適用可能なデータセットを特定したりする必要がなくなり、指定したタグセットとメトリックセットに対してクエリを実行するだけで済みます。

セマンティック レイヤーのアップグレード

セマンティック レイヤーでタグとメトリックを明示的に定義するだけでは十分ではありませんでした。標準化されたデータ処理システムを構築するための次の目標は、データ処理パイプライン全体で一貫したタグと指標の定義を確保することでした。


このため、セマンティック レイヤーをデータ管理システムの心臓部にしました。



それはどのように機能しますか?


TDW のすべてのコンピューティング ロジックは、単一のタグまたはメトリックの形式でセマンティック レイヤーで定義されます。


セマンティック レイヤーは、アプリケーション側からロジック クエリを受け取り、それに応じてエンジンを選択し、SQL を生成します。次に、実行のために SQL コマンドを TDW に送信します。その間、構成とデータ取り込みタスクを Doris に送信し、どのメトリックとタグを高速化する必要があるかを決定することもあります。


このようにして、タグとメトリックをより管理しやすくしました。軟膏のフライは、各タグとメトリックが個別に定義されているため、クエリの有効な SQL ステートメントの生成を自動化するのに苦労していることです。これについて何か考えがある場合は、ぜひご相談ください。


Apache Doris をフル活用する


ご覧のとおり、Apache Doris は私たちのソリューションで極めて重要な役割を果たしています。 Doris の使用を最適化すると、全体的なデータ処理効率が大幅に向上します。そのため、このパートでは、データの取り込みとクエリを高速化し、コストを削減するために Doris を使用して行っていることを共有します。


私たちが欲しいもの?


現在、TDW の 80 以上のソース テーブルから派生した 800 以上のタグと 1300 以上の指標があります。 TDW から Doris にデータをインポートするとき、次のことを達成したいと考えています。


  • リアルタイムの可用性: 従来の T+1 オフライン データ インジェストに加えて、リアルタイムのタグ付けが必要です。


  • 部分更新: 各ソース テーブルは、独自の ETL タスクを通じてさまざまなペースでデータを生成し、タグとメトリックの一部のみを含むため、列の部分更新のサポートが必要です。


  • 高いパフォーマンス: グループのターゲティング、分析、およびレポートのシナリオでは、わずか数秒の応答時間が必要です。


  • 低コスト: 可能な限りコストを削減したいと考えています。


私達がすること?


  1. TDW の代わりに Flink でフラット テーブルを生成する



TDW でフラット テーブルを生成することには、いくつかの欠点があります。


  • 高いストレージ コスト: TDW は、個別の 80 以上のソース テーブルとは別に、特別にフラットなテーブルを維持する必要があります。それは巨大な冗長性です。


  • リアルタイム性が低い: ソース テーブルの遅延が増大し、データ リンク全体が遅延します。


  • 高い開発コスト: リアルタイム性を実現するには、追加の開発努力とリソースが必要です。

それどころか、Doris でフラット テーブルを生成することは、はるかに簡単で安価です。プロセスは次のとおりです。


  • Spark を使用して、オフラインで新しいデータを Kafka にインポートします。
  • Flink を使用して Kafka データを消費します。
  • 主キー ID を介してフラット テーブルを作成します。
  • フラット テーブルを Doris にインポートします。以下に示すように、Flink は "ID"=1 の 5 行のデータを Doris の 1 行に集約し、Doris のデータ書き込みのプレッシャーを軽減しました。


これにより、TDW はデータの 2 つのコピーを維持する必要がなくなり、KafKa は取り込み待ちの新しいデータを保存するだけで済むため、ストレージ コストを大幅に削減できます。さらに、必要な ETL ロジックを Flink に追加し、オフラインおよびリアルタイムのデータ取り込みのために多くの開発ロジックを再利用できます。


  1. 列にスマートな名前を付ける


前述したように、Doris の集約モデルでは、列の部分的な更新が可能です。ここでは、参照用に Doris の他のデータ モデルを簡単に紹介します。


一意のモデル: これは、主キーの一意性が必要なシナリオに適用されます。同じ主キー ID の最新のデータのみを保持します。 (私たちが知る限り、Apache Doris コミュニティは Unique Model にも列の部分的な更新を含めることを計画しています。)


複製モデル: このモデルでは、すべての元のデータが事前集計や重複排除なしでそのまま保存されます。


データ モデルを決定した後、列に名前を付ける方法を考える必要がありました。タグまたはメトリックを列名として使用することは、次の理由で選択できませんでした。


Ⅰ.内部データ ユーザーはメトリックまたはタグの名前を変更する必要があるかもしれませんが、Doris 1.1.3 は列名の変更をサポートしていません。


Ⅱ.タグはオンラインとオフラインで頻繁に使用される場合があります。列の追加と削除が必要な場合は、時間がかかるだけでなく、クエリのパフォーマンスにも悪影響を及ぼします。代わりに、次のことを行います。


  • タグとメトリックの名前を柔軟に変更するために、MySQL テーブルを使用してメタデータ (名前、グローバルに一意の ID、ステータスなど) を保存します。名前の変更はメタデータでのみ発生し、Doris のテーブル スキーマには影響しません。たとえば、 song_nameに 4 の ID が指定されている場合、それは Doris に a4 の列名で格納されます。次に、 song_nameクエリに含まれる場合、SQL で a4 に変換されます。


  • タグのオンライン化とオフライン化については、タグの使用頻度に基づいてタグを分類します。最も使用頻度の低いものには、メタデータでオフライン マークが付けられます。オフライン タグの下に新しいデータは配置されませんが、それらのタグの下にある既存のデータは引き続き使用できます。


  • 新しく追加されたタグとメトリックをリアルタイムで利用できるようにするために、名前 ID のマッピングに基づいて、Doris テーブルにいくつかの ID 列を事前に作成します。これらの予約済み ID 列は、新しく追加されたタグと指標に割り当てられます。したがって、テーブル スキーマの変更とそれに伴うオーバーヘッドを回避できます。私たちの経験によると、タグとメトリックが追加されてからわずか 10 分後に、それらの下のデータが利用可能になります。


注目すべきは、最近リリースされた Doris 1.2.0 が Light Schema Change をサポートしていることです。つまり、列を追加または削除するには、FE でメタデータを変更するだけで済みます。また、テーブルのライト スキーマ変更を有効にしている限り、データ テーブルの列の名前を変更できます。これは私たちにとって大きなトラブルセーバーです。


  1. 日付書き込みの最適化


毎日のオフライン データ取り込み時間を 75% 削減し、CUMU 圧縮スコアを 600+ から 100 に削減したいくつかのプラクティスを次に示します。


  • Flink の事前集計: 上記のとおりです。


  • 書き込みバッチの自動サイジング: Flink リソースの使用量を削減するために、1 つの Kafka トピック内のデータをさまざまな Doris テーブルに書き込むことができるようにし、データ量に基づくバッチ サイズの自動変更を実現します。


  • Doris データ書き込みの最適化: タブレットとバケットのサイズ、および各シナリオの圧縮パラメーターを微調整します。


     max_XXXX_compaction_thread max_cumulative_compaction_num_singleton_deltas


  • BE コミット ロジックの最適化: BE リストの定期的なキャッシュを実行し、バッチごとに BE ノードにコミットし、より細かい負荷分散の粒度を使用します。



  1. クエリで Dori-on-ES を使用する

    データ クエリの約 60% には、グループ ターゲティングが含まれます。グループ ターゲティングとは、一連のタグをフィルターとして使用してターゲット データを見つけることです。これにより、データ処理アーキテクチャにいくつかの要件が課せられます。


  • APP ユーザーに関連するグループ ターゲティングには、非常に複雑なロジックが含まれる場合があります。つまり、システムは何百ものタグをフィルターとして同時にサポートする必要があります。


  • ほとんどのグループ ターゲティング シナリオでは、最新のタグ データのみが必要です。ただし、メトリクス クエリは履歴データをサポートする必要があります。


  • データ ユーザーは、グループ ターゲティングの後、メトリック データのさらに集計された分析を実行する必要がある場合があります。


  • データ ユーザーは、グループ ターゲティングの後で、タグとメトリックに対して詳細なクエリを実行する必要がある場合もあります。


検討の結果、Doris-on-ES を採用することにしました。 Doris は各シナリオのメトリック データをパーティション テーブルとして保存する場所であり、Elasticsearch はすべてのタグ データを保存します。 Doris-on-ES ソリューションは、Doris の分散クエリ プランニング機能と Elasticsearch の全文検索機能を組み合わせたものです。クエリ パターンは次のとおりです。


 SELECT tag, agg(metric) FROM Doris WHERE id in (select id from Es where tagFilter) GROUP BY tag


示されているように、Elasticsearch にある ID データは、メトリクス分析のために Doris のサブクエリで使用されます。実際には、クエリの応答時間はターゲット グループのサイズに関連していることがわかりました。ターゲット グループに 100 万を超えるオブジェクトが含まれている場合、クエリには最大 60 秒かかります。それ以上の場合、タイムアウトエラーが発生する場合があります。調査の結果、2 つの最大の時間浪費要因を特定しました。


I. Doris BE が Elasticsearch (デフォルトでは一度に 1024 行) からデータをプルすると、100 万を超えるオブジェクトのターゲット グループの場合、ネットワーク I/O オーバーヘッドが膨大になる可能性があります。


Ⅱ.データをプルした後、Doris BE は SHUFFLE/BROADCAST を介してローカル メトリック テーブルとの結合操作を実行する必要がありますが、これには多くの費用がかかります。



したがって、次の最適化を行います。


  • 最適化を有効にするかどうかを指定するクエリ セッション変数es_optimizeを追加します。


  • ES へのデータ書き込みでは、主キー ID をハッシュした後にバケット番号を格納する BK 列を追加します。アルゴリズムは、Doris (CRC32) のバケット アルゴリズムと同じです。


  • Doris BE を使用して Bucket Join 実行計画を生成し、バケット番号を BE ScanNode にディスパッチして ES にプッシュします。


  • ES を使用してクエリされたデータを圧縮します。複数のデータ取得を 1 つにまとめ、ネットワーク I/O オーバーヘッドを削減します。


  • Doris BE がローカル メトリック テーブルに関連するバケットのデータのみをプルし、Doris BE 間のデータ シャッフルを回避するためにローカル Join 操作を直接実行することを確認してください。



    その結果、大規模なグループ ターゲティングのクエリ応答時間が 60 秒から 3.7 秒に短縮されました。コミュニティ情報によると、まもなくリリースされるバージョン 2.0.0 以降、Doris は逆索引付けをサポートする予定です。この新しいバージョンでは、テキスト タイプの全文検索、テキスト、数値、および日時の等価性または範囲フィルタリングを実行し、逆インデックスが配列タイプをサポートしているため、フィルタリングで AND、OR、NOT ロジックを便利に組み合わせることができます。 Doris のこの新機能は、同じタスクで Elasticsearch よりも 3 ~ 5 倍優れたパフォーマンスを提供することが期待されています。


  1. データの管理を改善する


Doris のコールド データとホット データの分離能力は、データ処理におけるコスト削減戦略の基盤を提供します。


  • Doris の TTL メカニズムに基づいて、現在の年のデータのみを Doris に保存し、その前の履歴データを TDW に配置して、ストレージ コストを削減します。


  • データ パーティションごとにコピーの数を変えています。例えば、使用頻度の高い直近3か月分のデータを3部、半年以上前のデータを1部、その間のデータを2部としています。


Doris はホット データをコールド データに変換することをサポートしているため、過去 7 日間のデータのみを SSD に保存し、それよりも古いデータを HDD に転送してストレージを安価にします。

結論

ここまでスクロールして、長い間読んでいただきありがとうございます。 ClickHouse から Doris への移行中に、歓声と涙、学んだ教訓、およびいくつかの価値があるかもしれないいくつかのプラクティスを共有しました. Apache Doris コミュニティと SelectDB チームからの支援には本当に感謝していますが、コールド データとホット データの自動識別、頻繁に使用されるタグ/メトリクスの事前計算を実現しようとしているため、しばらくの間、彼らを追跡することになるかもしれません。マテリアライズド ビューなどを使用したコード ロジックの簡素化など。