立ち止まって日常生活について考えてみると、すべてを出来事として簡単に見ることができます。次のシーケンスを検討してください。
ここで延々と続けることもできますが、私は言いたいことを言いました。人生は一連の出来事です。その事実を考えると、今日の新しいソフトウェア システムをどのように設計しますか?さまざまな結果を収集して任意の間隔で処理しますか、それとも 1 日の終わりまで待ってから処理しますか?いいえ、あなたはしません。各イベントが発生したらすぐに対応する必要があります。確かに、個々の状況に応じてすぐに対応できない場合もあるかもしれません… 1 日分のトランザクションを一度にダンプすることを考えてみてください。それでも、データを受け取ったらすぐに行動するでしょう。
では、イベントを操作するためのソフトウェア システムをどのように実装すればよいでしょうか。答えはストリーム処理です。
イベント データを処理するためのデファクト テクノロジとなったストリーム処理は、イベントをアプリケーションの主要な入力または出力と見なすソフトウェア開発へのアプローチです。たとえば、情報に基づいて行動したり、不正なクレジット カード購入の可能性がある場合に対応したりするのを待っていても意味がありません。また、マイクロサービスでレコードの受信フローを処理する必要がある場合もあり、それらを最も効率的に処理することがアプリケーションにとって最適です。
ユース ケースが何であれ、イベント ストリーミング アプローチがイベントを処理するための最良のアプローチであると言っても過言ではありません。
このブログ投稿では、Apache Kafka®、.NET プロデューサーおよびコンシューマー クライアント、および Microsoft のTask Parallel Library (TPL)を使用して、イベント ストリーミング アプリケーションを構築します。一見すると、これら 3 つすべてを一緒に作業する可能性が高い候補として自動的にまとめることはできないかもしれません。確かに、Kafka と .NET クライアントは素晴らしい組み合わせですが、TPL はどこに当てはまりますか?
多くの場合、スループットは重要な要件であり、Kafka からの消費とダウンストリーム処理の間のインピーダンスの不一致によるボトルネックを回避するために、機会があればいつでもプロセス内並列化をお勧めします。
3 つのコンポーネントがどのように連携して、堅牢で効率的なイベント ストリーミング アプリケーションを構築するかを読んでください。最良の部分は、Kafka クライアントと TPL が面倒な作業のほとんどを処理することです。ビジネス ロジックに集中するだけで済みます。
アプリケーションに飛び込む前に、各コンポーネントについて簡単に説明しましょう。
ストリーム処理がイベント ストリームを処理するための事実上の標準である場合、 Apache Kafka はイベント ストリーミング アプリケーションを構築するための事実上の標準です。 Apache Kafka は、高度にスケーラブルで弾力性があり、フォールト トレラントで安全な方法で提供される分散ログです。簡単に言えば、Kafka はブローカー (サーバー) とクライアントを使用します。ブローカーは、データ センターまたはクラウド リージョンにまたがることができる Kafka クラスターの分散ストレージ レイヤーを形成します。クライアントは、ブローカー クラスターからイベント データを読み書きする機能を提供します。 Kafka クラスターはフォールト トレラントです。いずれかのブローカーに障害が発生した場合、他のブローカーが作業を引き継いで継続的な運用を保証します。
前の段落で、クライアントが Kafka ブローカー クラスターに書き込みまたは読み取りを行うことを述べました。 Apache Kafka は Java クライアントにバンドルされていますが、このブログ記事のアプリケーションの中心にある .NET Kafka プロデューサーとコンシューマーなど、他のいくつかのクライアントも利用できます。 .NET プロデューサーとコンシューマーは、Kafka によるイベント ストリーミングのパワーを .NET 開発者にもたらします。 .NET クライアントの詳細については、ドキュメントを参照してください。
Task Parallel Library ( TPL ) は、「System.Threading および System.Threading.Tasks 名前空間内のパブリック型と API のセット」であり、並行アプリケーションの作成作業を簡素化します。 TPL は、次の詳細を処理することで、同時実行の追加をより管理しやすいタスクにします。
1. 作業の分割の処理 2. ThreadPool でのスレッドのスケジューリング 3. キャンセル、状態管理などの低レベルの詳細
つまり、TPL を使用すると、アプリケーションの処理パフォーマンスを最大化しながら、ビジネス ロジックに集中できるということです。具体的には、TPL のデータフロー ライブラリサブセットを使用します。
データフロー ライブラリは、インプロセス メッセージ パッシングとパイプライン タスクを可能にするアクター ベースのプログラミング モデルです。 Dataflow コンポーネントは、TPL の型とスケジューリング インフラストラクチャに基づいて構築され、C# 言語とシームレスに統合されます。通常、Kafka からの読み取りは非常に高速ですが、通常、処理 (DB 呼び出しまたは RPC 呼び出し) がボトルネックになります。順序付けの保証を犠牲にすることなく、より高いスループットを達成するために利用できる並列化の機会は、検討する価値があります。
このブログ投稿では、これらの Dataflow コンポーネントを .NET Kafka クライアントと共に活用して、データが利用可能になったときにデータを処理するストリーム処理アプリケーションを構築します。
作成するアプリケーションに入る前に、次のことを行います。 TPL データフロー ライブラリの構成要素に関する背景情報を提供する必要があります。ここで説明するアプローチは、高いスループットを必要とする CPU と I/O を集中的に使用するタスクがある場合に最も適しています。 TPL データフロー ライブラリは、着信データまたはレコードをバッファリングおよび処理できるブロックで構成され、ブロックは次の 3 つのカテゴリのいずれかに分類されます。
ソース ブロック – データのソースとして機能し、他のブロックはそこから読み取ることができます。
ターゲット ブロック – 他のブロックから書き込むことができるデータのレシーバーまたはシンク。
Propagator ブロック – ソース ブロックとターゲット ブロックの両方として動作します。
さまざまなブロックを取り、それらを接続して、線形処理パイプラインまたは処理のより複雑なグラフを形成します。次の図を検討してください。
データフロー ライブラリには、バッファリング、実行、グループ化の 3 つのカテゴリに分類される定義済みのブロック タイプがいくつか用意されています。このブログ投稿用に開発されたプロジェクトでは、バッファリング タイプと実行タイプを使用しています。 BufferBlock<T> は、データをバッファリングする汎用構造体であり、プロデューサー/コンシューマー アプリケーションでの使用に最適です。 BufferBlock は、受信データの処理に先入れ先出しキューを使用します。
BufferBlock (およびそれを拡張するクラス) は、データフロー ライブラリでメッセージを直接読み書きできる唯一のブロック タイプです。他のタイプは、ブロックからメッセージを受信したり、ブロックにメッセージを送信したりすることを期待しています。このため、ソース ブロックを作成してISourceBlock
インターフェイスを実装し、 ITargetBlock
インターフェイスを実装するシンク ブロックを実装するときに、 BufferBlock
をデリゲートとして使用しました。
このアプリケーションで使用されるもう 1 つの Dataflow ブロック タイプは、TransformBlock <TInput, TOutput>です。データフロー ライブラリのほとんどのブロック タイプと同様に、変換ブロックが受け取る入力レコードごとに実行するデリゲートとして機能するFunc<TInput, TOutput>
を提供することによって、TransformBlock のインスタンスを作成します。
Dataflow ブロックの 2 つの重要な機能は、バッファするレコードの数と並列処理のレベルを制御できることです。
最大バッファー容量を設定することにより、アプリケーションが処理パイプラインのある時点で長時間の待機に遭遇した場合、アプリケーションは自動的にバック プレッシャーを適用します。この背圧は、データの過剰蓄積を防ぐために必要です。その後、問題が治まり、バッファのサイズが小さくなると、再びデータが消費されます。
ブロックの同時実行を設定する機能は、パフォーマンスにとって重要です。 1 つのブロックが CPU または I/O 集中型のタスクを実行する場合、処理を並列化してスループットを向上させる自然な傾向があります。ただし、同時実行性を追加すると、処理順序という問題が発生する可能性があります。ブロックのタスクにスレッドを追加すると、データの出力順序を保証できなくなります。場合によっては順序が問題にならないこともありますが、重要な場合は、考慮すべき深刻なトレードオフになります。つまり、同時実行性によるスループットの向上と順序出力の処理です。幸いなことに、Dataflow ライブラリでは、このトレードオフを行う必要はありません。
ブロックの並列処理を複数に設定すると、フレームワークは入力レコードの元の順序を維持することを保証します (並列処理による順序の維持は構成可能であり、デフォルト値は true です)。データの元の順序が A、B、C の場合、出力順序は A、B、C になります。懐疑的ですか?私は自分がそうだったことを知っているので、テストしたところ、宣伝どおりに機能することがわかりました.このテストについては、この記事の後半で説明します。並列処理の増加は、ステートレス操作またはassociative および commutativeであるステートフル操作でのみ行う必要があることに注意してください。つまり、操作の順序またはグループ化を変更しても結果には影響しません。
この時点で、これがどこに向かっているのかがわかります。可能な限り最速の方法で処理する必要があるイベントを表す Kafka トピックがあります。そのため、.NET KafkaConsumer を含むソース ブロック、ビジネス ロジックを実行する処理ブロック、および最終結果を Kafka トピックに書き戻す .NET KafkaProducer を含むシンク ブロックで構成されるストリーミング アプリケーションを構築します。アプリケーションの概要図を次に示します。
アプリケーションの構造は次のとおりです。
BufferBlock
デリゲートのラップBufferBlock
デリゲートのラップ
次に、アプリケーションの全体的なフローと、Kafka とデータフロー ライブラリを活用して強力なイベント ストリーミング アプリケーションを構築する際の重要なポイントについて説明します。
シナリオは次のとおりです。オンライン ストアから購入のレコードを受け取る Kafka トピックがあり、受信データ形式は JSON です。購入の詳細に ML 推論を適用して、これらの購入イベントを処理したいと考えています。さらに、JSON レコードを Protobuf 形式に変換したいと考えています。これは、全社的なデータ形式であるためです。もちろん、アプリケーションのスループットは不可欠です。 ML 操作は CPU を集中的に使用するため、アプリケーションのスループットを最大化する方法が必要です。そのため、アプリケーションのその部分の並列化を利用できます。
ソース ブロックから始めて、ストリーミング アプリケーションの重要なポイントを見ていきましょう。 ISourceBlock
インターフェイスの実装については前に説明しましたが、 BufferBlock
もISourceBlock
実装しているため、すべてのインターフェイス メソッドを満たすデリゲートとして使用します。したがって、ソース ブロックの実装は KafkaConsumer と BufferBlock をラップします。ソース ブロック内には、コンシューマーが消費したレコードをバッファーに渡すことだけを担当する別のスレッドがあります。そこから、バッファはレコードをパイプラインの次のブロックに転送します。
レコードをバッファに転送する前に、 ConsumeRecord
( Consumer.consume
呼び出しによって返される) は、キーと値に加えて、アプリケーションにとって重要な元のパーティションとオフセットをキャプチャするRecord
抽象化によってラップされます。その理由はすぐに説明します。また、パイプライン全体がRecord
抽象化で機能することにも注意してください。したがって、すべての変換により、キー、値、および元のオフセットのようなその他の重要なフィールドをラップする新しいRecord
オブジェクトが生成され、パイプライン全体で保持されます。
アプリケーションは、処理をいくつかの異なるブロックに分割します。各ブロックは処理チェーンの次のステップにリンクするため、ソース ブロックは、逆シリアル化を処理する最初のブロックにリンクします。 .NET KafkaConsumer はレコードの逆シリアル化を処理できますが、シリアル化されたペイロードをコンシューマーに渡し、Transform ブロックで逆シリアル化します。逆シリアル化は CPU を集中的に使用する可能性があるため、これを処理ブロックに配置すると、必要に応じて操作を並列化できます。
逆シリアル化の後、レコードは、JSON ペイロードを Protobuf 形式の Purchase データ モデル オブジェクトに変換する別の Transform ブロックに流れます。さらに興味深いのは、データが次のブロックに入るときです。これは、購入トランザクションを完全に完了するために必要な CPU 集中型のタスクを表しています。アプリケーションはこの部分をシミュレートし、提供された関数は 1 ~ 3 秒のランダムな時間でスリープします。
このシミュレートされた処理ブロックは、Dataflow ブロック フレームワークの機能を活用する場所です。 Dataflow ブロックをインスタンス化するときは、検出された各レコードに適用されるデリゲート Func インスタンスと、 ExecutionDataflowBlockOptions
インスタンスを提供します。前に Dataflow ブロックの構成について説明しましたが、ここでもう一度簡単に確認します。 ExecutionDataflowBlockOptions
は、そのブロックの最大バッファー サイズと最大並列化度という 2 つの重要なプロパティが含まれています。
パイプライン内のすべてのブロックのバッファ サイズ構成を 10,000 レコードに設定しますが、シミュレートされた CPU 集中型を除いて、デフォルトの並列化レベル 1 を使用し、4 に設定します。デフォルトの Dataflow バッファ サイズは無制限。パフォーマンスへの影響については次のセクションで説明しますが、ここではアプリケーションの概要を説明します。
集中処理ブロックは、sink ブロックにフィードするシリアル化変換ブロックに転送されます。次に、.NET KafkaProducer がラップされ、最終結果が Kafka トピックに生成されます。シンク ブロックは、デリゲートBufferBlock
と生成用の別のスレッドも使用します。スレッドは、バッファから次に利用可能なレコードを取得します。次に、 KafkaProducer.Produce
メソッドを呼び出して、 DeliveryReport
をラップするAction
デリゲートを渡します。プロデュース リクエストが完了すると、プロデューサー I/O スレッドがAction
デリゲートを実行します。
これで、アプリケーションの大まかなチュートリアルは完了です。ここで、設定の重要な部分であるコミット オフセットの処理方法について説明しましょう。これは、コンシューマーからレコードをパイプライン処理する場合に不可欠です。
Kafka でデータを処理する場合、アプリケーションが所定のポイントまで正常に処理したレコードのオフセット (オフセットは Kafka トピック内のレコードの論理位置) を定期的にコミットします。では、なぜオフセットをコミットするのでしょうか?これは簡単な質問です。コンシューマーが制御された方法で、またはエラーによってシャットダウンすると、最後にコミットされた既知のオフセットから処理が再開されます。オフセットを定期的にコミットすることにより、コンシューマーはレコードを再処理しません。または、いくつかのレコードを処理した後、コミットする前にアプリケーションがシャットダウンした場合に、少なくとも最小限の量を再処理しません。このアプローチは、少なくとも 1 回の処理として知られています。これは、レコードが少なくとも 1 回処理されることを保証し、エラーが発生した場合、それらの一部が再処理される可能性がありますが、代替手段がデータ損失のリスクがある場合、これは優れたオプションです。 Kafka は 1 回限りの処理保証も提供します。このブログ投稿ではトランザクションについては触れませんが、Kafka でのトランザクションの詳細については、次の記事を参照してください。
オフセットをコミットする方法はいくつかありますが、最も単純で最も基本的な方法は自動コミット アプローチです。コンシューマーがレコードを読み取り、アプリケーションがそれらを処理します。 (レコードのタイムスタンプに基づいて) 構成可能な時間が経過すると、コンシューマーは既に消費されたレコードのオフセットをコミットします。通常、自動コミットは妥当なアプローチです。典型的な消費プロセス ループでは、以前に消費したすべてのレコードを正常に処理するまで、消費者に戻りません。予期しないエラーまたはシャットダウンが発生した場合、コードはコンシューマーに返されないため、コミットは発生しません。しかし、ここでのアプリケーションでは、パイプライン処理を行っています。消費されたレコードを取得してバッファにプッシュし、さらに消費するために戻ってきます。処理が成功するのを待つ必要はありません。
パイプライン アプローチでは、少なくとも 1 回の処理をどのように保証しますか?メソッドIConsumer.StoreOffset
活用します。このメソッドは、単一のパラメーター ( TopicPartitionOffset
) を処理し、次のコミットのために (他のオフセットと共に) 保存します。オフセット管理に対するこのアプローチは、自動コミットが Java API でどのように機能するかを対照的に示していることに注意してください。
そのため、コミット手順は次のように動作します。シンク ブロックが Kafka に生成するレコードを取得すると、アクション デリゲートにも提供されます。プロデューサーがコールバックを実行すると、元のオフセットがコンシューマー (ソース ブロック内の同じインスタンス) に渡され、コンシューマーは StoreOffset メソッドを使用します。コンシューマーに対して自動コミットを引き続き有効にしていますが、コンシューマーがこの時点までに消費した最新のオフセットをやみくもにコミットするのではなく、コミットするオフセットを提供しています。
そのため、アプリケーションはパイプラインを使用しますが、ブローカーから ack を受信した後にのみコミットします。つまり、ブローカーとレプリカ ブローカーの最小セットがレコードを保存したことを意味します。このように動作すると、ブロックが作業を実行している間、コンシューマーが継続的にパイプラインを取得してフィードできるため、アプリケーションの進行が速くなります。このアプローチが可能なのは、.NET コンシューマー クライアントがスレッド セーフであるため (一部のメソッドはスレッド セーフではなく、そのように文書化されています)、単一のコンシューマーをソース ブロック スレッドとシンク ブロック スレッドの両方で安全に動作させることができます。
プロデュース段階でエラーが発生した場合、アプリケーションはエラーをログに記録し、レコードをネストされたBufferBlock
に戻して、プロデューサがブローカへのレコードの送信を再試行するようにします。しかし、この再試行ロジックはやみくもに行われるため、実際には、より堅牢なソリューションが必要になるでしょう。
アプリケーションがどのように機能するかを説明したので、パフォーマンスの数値を見てみましょう。すべてのテストは macOS Big Sur (11.6) ラップトップでローカルに実行されたため、このシナリオではマイレージが異なる場合があります。パフォーマンス テストのセットアップは簡単です。
JSON 形式の Kafka トピックに 100 万レコードを生成します。このステップは事前に行われており、テスト測定には含まれていません。
Kafka Dataflow 対応アプリケーションを起動し、すべてのブロックの並列化を 1 (デフォルト) に設定します。
アプリケーションは 100 万件のレコードを正常に処理するまで実行され、その後シャットダウンされます
すべてのレコードを処理するのにかかった時間を記録する
2 番目のラウンドの唯一の違いは、シミュレートされた CPU 集中型ブロックの MaxDegreeOfParallelism を 4 に設定したことです。
結果は次のとおりです。
レコード数 | 同時実行係数 | 時間(分) |
---|---|---|
1M | 1 | 38 |
1M | 4 | 9 |
そのため、構成を設定するだけで、イベントの順序を維持しながらスループットを大幅に向上させることができました。したがって、並列処理の最大次数を 4 に設定すると、4 倍以上の高速化が期待できます。しかし、このパフォーマンス向上の重要な部分は、正しく実行するのが難しい並行コードをまったく作成していないことです。
ブログ投稿の前半で、Dataflow ブロックとの同時実行がイベントの順序を維持することを検証するテストについて言及したので、それについて説明しましょう。トライアルには、次の手順が含まれていました。
Kafka トピックに 1M 整数 (0 ~ 999,999) を生成する
リファレンス アプリケーションを整数型で動作するように変更する
シミュレートされたリモート プロセス ブロックの同時実行レベル 1 でアプリケーションを実行します—Kafka トピックに生成します
同時実行レベル 4 でアプリケーションを再実行し、数値を別の Kafka トピックに生成します。
プログラムを実行して、両方の結果トピックから整数を消費し、それらをメモリ内の配列に格納します
両方の配列を比較し、それらが同じ順序であることを確認します
このテストの結果、両方の配列に 0 から 999,999 までの順序で整数が含まれており、複数の並列処理レベルで Dataflow ブロックを使用すると、受信データの処理順序が維持されることが証明されました。 Dataflow 並列処理の詳細については、 ドキュメントを参照してください。
この投稿では、.NET Kafka クライアントと Task Parallel Library を使用して、堅牢で高スループットのイベント ストリーミング アプリケーションを構築する方法を紹介しました。 Kafka は高パフォーマンスのイベント ストリーミングを提供し、Task Parallel Library は、すべての詳細を処理するためのバッファリングを備えた並行アプリケーションを作成するためのビルディング ブロックを提供し、開発者がビジネス ロジックに集中できるようにします。アプリケーションのシナリオは少し不自然ですが、うまくいけば、2 つのテクノロジを組み合わせることの有用性がわかります。試してみる-