やあ👋
私はAll Quietの創設者、Mads Quist です。私たちはMongoDBに基づいて独自のメッセージ キューを実装しました。ここでは次のことについてお話します。
- 車輪を再発明した理由
- 車輪を再発明した方法
1. 車輪を再発明した理由
なぜメッセージキューが必要なのでしょうか?
All Quiet は、PagerDuty に似た最新のインシデント管理プラットフォームです。
私たちのプラットフォームには次のような機能が必要です。
- ユーザー登録後のダブルオプトイン電子メールの非同期送信
- 登録後 24 時間後にリマインダーメールを送信する
- Firebase Cloud Messaging (FCM) を使用したプッシュ通知の送信は、ネットワークまたは負荷の問題により失敗する可能性があります。プッシュ通知はアプリにとって非常に重要であるため、問題がある場合は送信を再試行する必要があります。
- 統合外からの電子メールを受け入れ、インシデントとして処理します。このプロセスは失敗する可能性があるため、プロセスを分離し、キュー上の各電子メール ペイロードを処理する必要がありました。
当社の技術スタック
私たちの特定の要件を理解するには、技術スタックについて洞察を得ることが重要です。
- .NET Core 7 に基づいてモノリシック Web アプリケーションを実行します。
- .NET Core アプリケーションは Docker コンテナーで実行されます。
- 複数のコンテナを並行して実行します。
- HAProxy インスタンスは、HTTP リクエストを各コンテナに均等に分散し、高可用性のセットアップを保証します。
- 基盤となるデータベースとして MongoDB を使用し、可用性ゾーン全体にレプリケートされます。
- 上記のコンポーネントはすべて、AWS によって汎用 EC2 VM 上でホストされます。
車輪を再発明した理由
- 私たちは、各メッセージが 1 回だけ処理されることを保証しながら、複数のプロセスで同時に実行できるシンプルなキューイング メカニズムを求めていました。
- pub/sub パターンは必要ありませんでした。
- 私たちは CQRS / イベント ソーシングに基づく複雑な分散システムを目指していませんでした。分散システムの第一のルールは、分散しないことだからです。
- 私たちは、「退屈なテクノロジー」を選択するという哲学に従って、物事をできるだけシンプルに保ちたいと考えました。
最終的には、インフラストラクチャ内の可動部品の数を最小限に抑えることが重要です。私たちは優れた顧客のために素晴らしい機能を構築することを目指しており、サービスを確実に維持することが不可欠です。単一のデータベース システムを管理してファイブナインを超える稼働時間を達成することは、十分に困難です。では、なぜ追加の HA RabbitMQ クラスターの管理に負担がかかるのでしょうか?
AWS SQS を使用しないのはなぜでしょうか?
そうですね…AWS SQS、Google Cloud Tasks、Azure Queue Storage などのクラウド ソリューションは素晴らしいです。ただし、ベンダーロックインが発生する可能性があります。私たちは、クライアントにスケーラブルなサービスを提供しながら、独立性とコスト効率に優れることを目指しています。
2. 車輪をどのように再発明したか
メッセージキューとは何ですか?
メッセージ キューは、メッセージを保存するシステムです。メッセージのプロデューサはこれらをキューに保存し、後でコンシューマによって処理のためにデキューされます。これは、特にメッセージの処理がリソースを大量に消費するタスクである場合に、コンポーネントを分離する場合に非常に有益です。
キューはどのような特性を示す必要がありますか?
- データストレージとしてMongoDBを利用
- 各メッセージが 1 回だけ消費されることを保証する
- 複数のコンシューマがメッセージを同時に処理できるようにする
- メッセージ処理が失敗した場合に再試行が可能であることを確認する
- 将来のメッセージ消費のスケジュールを有効にする
- 保証された順序は必要ありません
- 高可用性の確保
- メッセージとその状態が永続的であり、再起動や長時間のダウンタイムにも耐えられるようにする
MongoDB は長年にわたって大幅に進化しており、上記の基準を満たすことができます。
実装
次のセクションでは、MongoDB 固有のメッセージ キューの実装について説明します。 NodeJS、Go、All Quiet の場合は C# など、好みのプログラミング言語に適したクライアント ライブラリが必要ですが、ここで共有する概念はプラットフォームに依存しません。
キュー
利用したい各キューは、MongoDB データベース内の専用のコレクションとして表されます。
メッセージモデル
処理されたメッセージの例を次に示します。
{ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }
メッセージの各プロパティを見てみましょう。
_id
_id
フィールドは、MongoDB の正規の一意識別子プロパティです。ここでは、 ObjectIdではなくNumberLong
含まれています。次の理由から、 ObjectId
の代わりにNumberLong
が必要です。
ObjectId値は時間の経過とともに増加しますが、必ずしも単調であるとは限りません。その理由は次のとおりです。
- 時間解像度は 1 秒のみ含まれるため、同じ秒内に作成された ObjectId 値の順序は保証されません。
- クライアントによって生成されるため、システム クロックが異なる場合があります。
C# 実装では、ミリ秒の精度で ID を生成し、挿入時間に基づいて順序付けを保証します。マルチコンシューマ環境 (RabbitMQ と同様) では厳密な処理順序は必要ありませんが、コンシューマが 1 つだけで動作する場合は FIFO 順序を維持することが重要です。 ObjectId を使用してこれを実現することは現実的ではありません。これが重要でない場合でも、ObjectId を使用できます。
ステータス
Statuses プロパティは、メッセージ処理履歴を含む配列で構成されます。インデックス 0 には、インデックス作成にとって重要な現在のステータスが表示されます。
ステータス オブジェクト自体には、次の 3 つのプロパティが含まれています。
-
Status
: 「エンキュー済み」、「処理中」、「処理済み」、または「失敗」のいずれかです。 -
Timestamp
: 現在のタイムスタンプを取得します。 -
NextReevaluation
: 次の評価がいつ行われるかを記録します。これは、再試行と将来のスケジュールされた実行の両方に不可欠です。
ペイロード
このプロパティには、メッセージの特定のペイロードが含まれます。
メッセージをキューに入れる
メッセージの追加は、ステータスが「エンキュー済み」に設定されたコレクションへの簡単な挿入操作です。
- 即時処理するには、
NextReevaluation
をnull
に設定します。 - 今後の処理のために、メッセージを処理する場合は、
NextReevaluation
将来のタイムスタンプに設定します。
db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });
メッセージのデキュー
デキューは少し複雑ですが、それでも比較的簡単です。 MongoDB の同時アトミック読み取りおよび更新機能に大きく依存しています。
MongoDB のこの重要な機能により、次のことが保証されます。
- 各メッセージは 1 回だけ処理されます。
- 複数のコンシューマーがメッセージを同時に安全に処理できます。
db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
したがって、状態「Enqueued」にある 1 つのメッセージを読み取り、同時に位置 0 にステータス「Processing」を設定することでメッセージを変更します。この操作はアトミックであるため、メッセージが別のコンシューマによって取得されないことが保証されます。 。
メッセージを処理済みとしてマークする
メッセージの処理が完了したら、メッセージの ID を使用してメッセージのステータスを「処理済み」に更新するだけです。
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
メッセージを失敗としてマークする
処理が失敗した場合は、それに応じてメッセージにマークを付ける必要があります。多くの場合、メッセージの処理を再試行することが必要になる場合があります。これは、メッセージを再度キューに入れることで実現できます。多くのシナリオでは、処理エラーの性質に応じて、10 秒などの特定の遅延の後にメッセージを再処理することが合理的です。
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });
デキューループ
私たちは、実際には単なる MongoDB コレクションである「キュー」に対して項目を簡単にエンキューしたり、そこからデキューしたりする方法を確立しました。 NextReevaluation
フィールドを利用して、将来のメッセージを「スケジュール」することもできます。
欠けているのは、定期的にデキューする方法です。コンシューマは、ある種のループでfindAndModify
コマンドを実行する必要があります。簡単なアプローチは、メッセージをデキューして処理する無限ループを作成することです。この方法は簡単で効果的です。ただし、データベースとネットワークにかなりの負荷がかかります。
別の方法は、ループの反復間に遅延 (たとえば、100ms) を導入することです。これにより負荷は大幅に軽減されますが、デキューの速度も低下します。
この問題の解決策は、MongoDB が変更ストリームと呼ぶものです。
MongoDB 変更ストリーム
変更ストリームとは何ですか? MongoDB の担当者よりもうまく説明することはできません。
変更ストリームにより、アプリケーションはリアルタイムのデータ変更にアクセスできるようになります […]。アプリケーションは変更ストリームを使用して、単一のコレクション […] のすべてのデータ変更をサブスクライブし、それらに即座に対応できます。
素晴らしい!私たちができることは、キュー コレクション内で新しく作成されたドキュメントをリッスンすることです。これは、事実上、新たにキューに入れられたメッセージをリッスンすることを意味します。
これは非常に単純です:
const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
スケジュールされたメッセージと孤立したメッセージ
ただし、変更ストリームのアプローチは、スケジュールされたメッセージと孤立したメッセージの両方に対しては機能しません。これは、明らかにリッスンできる変更がないためです。
- スケジュールされたメッセージは、ステータスが「Enqueued」、「NextReevaluation」フィールドが将来に設定された状態でコレクション内に存在するだけです。
- 孤立したメッセージは、コンシューマ プロセスが終了したときに「処理中」ステータスにあったメッセージです。これらは「処理中」ステータスでコレクションに残りますが、コンシューマがステータスを「処理済み」または「失敗」に変更することはありません。
これらのユースケースでは、単純なループに戻る必要があります。ただし、反復間にかなり余裕のある遅延を使用することもできます。
それをまとめる
MySQL 、 PostgreSQL 、 MongoDB (私はこれも従来型だと考えています) などの「従来型」データベースは、今日では信じられないほど強力です。正しく使用すれば (インデックスが最適化されていることを確認してください!)、従来のホスティング プラットフォームでは迅速で、驚くほど拡張でき、コスト効率が高くなります。
多くのユースケースは、データベースと好みのプログラミング言語だけを使用して対処できます。 「適切な仕事に適したツール」、つまり Redis、Elasticsearch、RabbitMQ などの多様なツールのセットを維持する必要は必ずしも必要ありません。多くの場合、メンテナンスのオーバーヘッドはそれだけの価値がありません。
提案されたソリューションは、たとえば RabbitMQ のパフォーマンスには及ばないかもしれませんが、通常は十分であり、スタートアップにとって大きな成功をもたらすポイントまで拡張できます。
ソフトウェア エンジニアリングでは、トレードオフを回避することが重要です。賢明にお選びください。