paint-brush
Tại sao tôi xây dựng hàng đợi tin nhắn được hỗ trợ bởi MongoDBby@allquiet
10,368
10,368

Tại sao tôi xây dựng hàng đợi tin nhắn được hỗ trợ bởi MongoDB

All Quiet12m2023/08/27
Read on Terminal Reader

Bạn có thể tạo hàng đợi tin nhắn HA và hiệu suất bằng MongoDB vì nó cung cấp các hoạt động đọc/cập nhật nguyên tử đồng thời cũng như các luồng thay đổi.
featured image - Tại sao tôi xây dựng hàng đợi tin nhắn được hỗ trợ bởi MongoDB
All Quiet HackerNoon profile picture
0-item
1-item
2-item


Này👋


Tôi là Mads Quist, người sáng lập All Quiet . Chúng tôi đã triển khai hàng đợi tin nhắn tự tạo dựa trên MongoDB và tôi ở đây để nói về:

  1. Tại sao chúng tôi lại phát minh ra bánh xe
  2. Cách chúng tôi phát minh lại bánh xe


1. Tại sao chúng tôi lại phát minh ra bánh xe

Tại sao chúng ta cần xếp hàng tin nhắn?

All Quiet là một nền tảng quản lý sự cố hiện đại, tương tự như PagerDuty.


Nền tảng của chúng tôi yêu cầu các tính năng như:


  • Gửi email xác nhận kép không đồng bộ sau khi người dùng đăng ký
  • Gửi email nhắc nhở 24 giờ sau khi đăng ký
  • Gửi thông báo đẩy bằng Nhắn tin qua đám mây Firebase (FCM), việc này có thể không thành công do sự cố mạng hoặc tải. Vì thông báo đẩy rất quan trọng đối với ứng dụng của chúng tôi nên chúng tôi cần thử gửi lại chúng nếu có sự cố.
  • Chấp nhận email từ bên ngoài sự tích hợp của chúng tôi và xử lý chúng thành các sự cố. Quá trình này có thể thất bại, vì vậy chúng tôi muốn tách nó ra và xử lý từng tải trọng email trên hàng đợi.




Kho công nghệ của chúng tôi

Để hiểu các yêu cầu cụ thể của chúng tôi, điều quan trọng là phải hiểu rõ hơn về nền tảng công nghệ của chúng tôi:


  • Chúng tôi chạy một ứng dụng web nguyên khối dựa trên .NET Core 7.
  • Ứng dụng .NET Core chạy trong vùng chứa Docker.
  • Chúng tôi chạy nhiều container song song.
  • Phiên bản HAProxy phân phối các yêu cầu HTTP đồng đều cho từng vùng chứa, đảm bảo thiết lập có tính khả dụng cao.
  • Chúng tôi sử dụng MongoDB làm cơ sở dữ liệu cơ bản, được sao chép trên các vùng khả dụng.
  • Tất cả các thành phần trên đều được AWS lưu trữ trên máy ảo EC2 chung.

Tại sao chúng tôi lại phát minh ra bánh xe

  • Chúng tôi mong muốn một cơ chế xếp hàng đơn giản có thể chạy đồng thời trong nhiều quy trình trong khi vẫn đảm bảo rằng mỗi tin nhắn chỉ được xử lý một lần.
  • Chúng tôi không cần mẫu pub/sub.
  • Chúng tôi không hướng tới một hệ thống phân tán phức tạp dựa trên nguồn cung ứng CQRS/sự kiện bởi vì bạn biết đấy, quy tắc đầu tiên của hệ thống phân tán là không phân phối các tệp .
  • Chúng tôi muốn giữ mọi thứ đơn giản nhất có thể, tuân theo triết lý lựa chọn " công nghệ nhàm chán ".


Cuối cùng, đó là việc giảm thiểu số lượng bộ phận chuyển động trong cơ sở hạ tầng của bạn. Chúng tôi mong muốn xây dựng các tính năng tuyệt vời cho những khách hàng xuất sắc của mình và bắt buộc phải duy trì dịch vụ của mình một cách đáng tin cậy. Việc quản lý một hệ thống cơ sở dữ liệu duy nhất để đạt được thời gian hoạt động hơn 5 giây là đủ thách thức. Vậy tại sao bạn lại phải gánh nặng việc quản lý một cụm HA RabbitMQ bổ sung?


Tại sao không sử dụng AWS SQS?

Vâng… các giải pháp đám mây như AWS SQS, Google Cloud Tasks hoặc Azure Queue Storage thật tuyệt vời! Tuy nhiên, chúng sẽ dẫn đến việc nhà cung cấp bị khóa. Chúng tôi chỉ mong muốn trở nên độc lập và tiết kiệm chi phí trong khi vẫn cung cấp dịch vụ có thể mở rộng cho khách hàng của mình.



2. Cách chúng tôi tái phát minh ra bánh xe

Hàng đợi tin nhắn là gì?

Hàng đợi tin nhắn là một hệ thống lưu trữ tin nhắn. Người tạo tin nhắn lưu trữ những tin nhắn này trong hàng đợi, sau đó được người tiêu dùng đưa ra hàng đợi để xử lý. Điều này cực kỳ có lợi cho việc tách các thành phần, đặc biệt khi xử lý tin nhắn là một nhiệm vụ tiêu tốn nhiều tài nguyên.


Hàng đợi của chúng ta nên thể hiện những đặc điểm gì?

  • Sử dụng MongoDB làm nơi lưu trữ dữ liệu của chúng tôi
  • Đảm bảo rằng mỗi tin nhắn chỉ được sử dụng một lần
  • Cho phép nhiều người tiêu dùng xử lý tin nhắn cùng một lúc
  • Đảm bảo rằng nếu quá trình xử lý tin nhắn không thành công thì có thể thử lại
  • Cho phép lập lịch sử dụng tin nhắn trong tương lai
  • Không cần đặt hàng đảm bảo
  • Đảm bảo tính sẵn sàng cao
  • Đảm bảo tin nhắn và trạng thái của chúng bền bỉ và có thể chịu được việc khởi động lại hoặc thời gian ngừng hoạt động kéo dài


MongoDB đã phát triển đáng kể trong những năm qua và có thể đáp ứng các tiêu chí được liệt kê ở trên.


Thực hiện

Trong các phần tiếp theo, tôi sẽ hướng dẫn bạn cách triển khai hàng đợi thư dành riêng cho MongoDB. Mặc dù bạn sẽ cần một thư viện máy khách phù hợp với ngôn ngữ lập trình ưa thích của mình, chẳng hạn như NodeJS, Go hoặc C# trong trường hợp All Quiet, nhưng các khái niệm tôi sẽ chia sẻ đều mang tính bất khả tri về nền tảng.


Hàng đợi

Mỗi hàng đợi bạn muốn sử dụng được thể hiện dưới dạng một bộ sưu tập chuyên dụng trong cơ sở dữ liệu MongoDB của bạn.

Mẫu tin nhắn

Dưới đây là ví dụ về tin nhắn được xử lý:

 { "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }


Chúng ta hãy xem xét từng thuộc tính của tin nhắn.


_nhận dạng

Trường _id là thuộc tính định danh duy nhất chuẩn của MongoDB. Ở đây, nó chứa NumberLong , không phải ObjectId . Chúng ta cần NumberLong thay vì ObjectId vì:



Mặc dù các giá trị ObjectId sẽ tăng theo thời gian nhưng chúng không nhất thiết phải đơn điệu. Điều này là do họ:

  • Chỉ chứa một giây độ phân giải thời gian, do đó các giá trị ObjectId được tạo trong cùng một giây không có thứ tự được đảm bảo và
  • Được tạo bởi các máy khách, có thể có đồng hồ hệ thống khác nhau.


Trong quá trình triển khai C#, chúng tôi tạo Id có độ chính xác đến mili giây và thứ tự được đảm bảo dựa trên thời gian chèn. Mặc dù chúng tôi không yêu cầu thứ tự xử lý nghiêm ngặt trong môi trường nhiều người tiêu dùng (tương tự như RabbitMQ), nhưng điều cần thiết là duy trì trật tự FIFO khi hoạt động chỉ với một người tiêu dùng. Đạt được điều này với ObjectId là không khả thi. Nếu điều này không quan trọng với bạn, bạn vẫn có thể sử dụng ObjectId.


Trạng thái

Thuộc tính Trạng thái bao gồm một mảng chứa lịch sử xử lý tin nhắn. Tại chỉ mục 0, bạn sẽ tìm thấy trạng thái hiện tại, điều này rất quan trọng cho việc lập chỉ mục.


Bản thân đối tượng trạng thái chứa ba thuộc tính:

  • Status : Có thể là "Đang xếp hàng", "Đang xử lý", "Đã xử lý" hoặc "Không thành công".
  • Timestamp : Cái này ghi lại dấu thời gian hiện tại.
  • NextReevaluation : Ghi lại thời điểm diễn ra lần đánh giá tiếp theo, điều này rất cần thiết cho cả lần thử lại và lần thực thi theo lịch trình trong tương lai.


Khối hàng

Thuộc tính này chứa tải trọng cụ thể của tin nhắn của bạn.


Xếp hàng tin nhắn

Thêm tin nhắn là một thao tác chèn đơn giản vào bộ sưu tập với trạng thái được đặt thành "Đã xếp hàng".

  • Để xử lý ngay lập tức, hãy đặt NextReevaluation thành null .
  • Để xử lý trong tương lai, hãy đặt NextReevaluation thành dấu thời gian trong tương lai khi bạn muốn xử lý thư của mình.
 db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });


Xóa tin nhắn

Dequeuing phức tạp hơn một chút nhưng vẫn tương đối đơn giản. Nó phụ thuộc rất nhiều vào khả năng đọc và cập nhật nguyên tử đồng thời của MongoDB.


Tính năng thiết yếu này của MongoDB đảm bảo:

  • Mỗi tin nhắn chỉ được xử lý một lần.
  • Nhiều người tiêu dùng có thể xử lý tin nhắn đồng thời một cách an toàn.


 db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


Vì vậy, chúng tôi đang đọc một tin nhắn ở trạng thái “Đã xếp hàng” và đồng thời sửa đổi nó bằng cách đặt trạng thái “Đang xử lý” ở vị trí 0. Vì thao tác này là nguyên tử nên nó sẽ đảm bảo rằng tin nhắn sẽ không được người tiêu dùng khác nhận .


Đánh dấu thư là đã được xử lý

Sau khi quá trình xử lý thư hoàn tất, việc cập nhật trạng thái thư thành "Đã xử lý" bằng id của thư chỉ là một vấn đề đơn giản.

 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


Đánh dấu thư là không thành công

Nếu xử lý không thành công, chúng ta cần đánh dấu tin nhắn cho phù hợp. Thông thường, bạn có thể muốn thử xử lý lại tin nhắn. Điều này có thể đạt được bằng cách sắp xếp lại tin nhắn. Trong nhiều trường hợp, việc xử lý lại tin nhắn sau một khoảng thời gian trễ cụ thể, chẳng hạn như 10 giây, tùy thuộc vào bản chất của lỗi xử lý.


 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });


Vòng lặp loại bỏ hàng đợi

Chúng tôi đã thiết lập cách chúng tôi có thể dễ dàng xếp và loại bỏ các mục khỏi "hàng đợi" của mình, trên thực tế, đây chỉ đơn giản là một bộ sưu tập MongoDB. Chúng tôi thậm chí có thể "lên lịch" các tin nhắn trong tương lai bằng cách tận dụng trường NextReevaluation .


Điều còn thiếu là chúng ta sẽ xếp hàng thường xuyên như thế nào. Người tiêu dùng cần thực thi lệnh findAndModify trong một số loại vòng lặp. Một cách tiếp cận đơn giản là tạo ra một vòng lặp vô tận trong đó chúng ta sắp xếp hàng đợi và xử lý một thông báo. Phương pháp này rất đơn giản và hiệu quả. Tuy nhiên, nó sẽ gây áp lực đáng kể lên cơ sở dữ liệu và mạng.


Một giải pháp thay thế là đưa ra độ trễ, ví dụ: 100 mili giây, giữa các lần lặp vòng lặp. Điều này sẽ giảm đáng kể tải nhưng cũng sẽ làm giảm tốc độ xếp hàng.


Giải pháp cho vấn đề này là thứ mà MongoDB gọi là luồng thay đổi .


Luồng thay đổi MongoDB

Luồng thay đổi là gì? Tôi không thể giải thích điều đó tốt hơn những người ở MongoDB:


Luồng thay đổi cho phép ứng dụng truy cập các thay đổi dữ liệu theo thời gian thực […]. Các ứng dụng có thể sử dụng luồng thay đổi để đăng ký tất cả các thay đổi dữ liệu trên một bộ sưu tập duy nhất […] và phản ứng ngay lập tức với chúng.


Tuyệt vời! Những gì chúng ta có thể làm là nghe các tài liệu mới được tạo trong bộ sưu tập hàng đợi của mình, điều này có nghĩa là nghe các tin nhắn mới được xếp hàng đợi một cách hiệu quả


Điều này thật đơn giản:

 const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });



Tin nhắn được lên lịch và mồ côi

Tuy nhiên, cách tiếp cận luồng thay đổi không hoạt động đối với cả tin nhắn được lên lịch và tin nhắn mồ côi vì rõ ràng là không có thay đổi nào mà chúng ta có thể lắng nghe.


  • Các thư đã lên lịch chỉ nằm trong bộ sưu tập với trạng thái "Đã xếp hàng" và trường "Đánh giá lại tiếp theo" được đặt thành tương lai.
  • Tin nhắn mồ côi là những tin nhắn ở trạng thái "Đang xử lý" khi quy trình tiêu dùng của chúng ngừng hoạt động. Chúng vẫn nằm trong bộ sưu tập với trạng thái "Đang xử lý" nhưng sẽ không có người tiêu dùng nào thay đổi trạng thái của chúng thành "Đã xử lý" hoặc "Không thành công".


Đối với những trường hợp sử dụng này, chúng ta cần quay lại vòng lặp đơn giản của mình. Tuy nhiên, chúng ta có thể sử dụng độ trễ khá lớn giữa các lần lặp.


Gói nó lại

Các cơ sở dữ liệu "truyền thống", như MySQL , PostgreSQL hoặc MongoDB (mà tôi cũng xem là truyền thống), ngày nay cực kỳ mạnh mẽ. Nếu được sử dụng đúng cách (đảm bảo các chỉ mục của bạn được tối ưu hóa!), chúng sẽ hoạt động nhanh chóng, có quy mô ấn tượng và tiết kiệm chi phí trên các nền tảng lưu trữ truyền thống.


Nhiều trường hợp sử dụng có thể được giải quyết chỉ bằng cách sử dụng cơ sở dữ liệu và ngôn ngữ lập trình ưa thích của bạn. Không phải lúc nào cũng cần có "công cụ phù hợp cho công việc phù hợp", nghĩa là duy trì một bộ công cụ đa dạng như Redis, Elaticsearch, RabbitMQ, v.v. Thông thường, chi phí bảo trì là không đáng.


Mặc dù giải pháp được đề xuất có thể không phù hợp với hiệu suất của RabbitMQ, chẳng hạn, nhưng nó thường đủ và có thể mở rộng đến mức đánh dấu thành công đáng kể cho công ty khởi nghiệp của bạn.


Công nghệ phần mềm là điều hướng sự đánh đổi. Chọn của bạn một cách khôn ngoan.