Reliable Messaging in Distributed Systems

Written by fairday | Published 2024/03/18
Tech Story Tags: domain-driven-design | reliable-messaging | distributed-systems | scalable-distributed-system | dual-write-problem | microservices-architecture | two-phase-commit | shared-database

TLDRBuilding a reliable, highly available, scalable distributed system requires adherence to specific techniques, principles, and patterns.via the TL;DR App

Dual write problem

Building a reliable, highly available, scalable distributed system requires adherence to specific techniques, principles, and patterns. The design of such systems involves addressing a myriad of challenges. Among the most prevalent and foundational issues is the dual write problem.

The “dual write problem” is a challenge that arises in distributed systems, mainly when dealing with multiple data sources or databases that need to be kept in sync. It refers to the difficulty of ensuring that data changes are consistently written to various data stores, such as databases or caches, without introducing issues like data inconsistencies, conflicts, or performance bottlenecks.

The microservices architecture and pattern database per service brings you many benefits, such as independent deployment and scaling, isolated failures, and a potential boost of development velocity. However, operations require changes among multiple microservices, forcing you to think about a reliable solution to tackle this problem.

Almost a real example

Let's consider a scenario in which our domain involves accepting loan applications, assessing them, and then sending notification alerts to customers.

In the spirit of the single responsibility principle, Conway’s law, and domain-driven design approach, after several event-storming sessions, the whole domain was split into three subdomains with defined bounded contexts having clear boundaries, domain models, and ubiquitous language.

The first is tasked with onboarding and compiling new loan applications. The second system evaluates these applications and makes decisions based on the data provided. This assessment process, including KYC/KYB, antifraud, and credit risk checks, can be time-consuming, necessitating the ability to handle thousands of applications simultaneously. Consequently, this functionality has been delegated to a dedicated microservice with its own database, allowing for independent scaling.

Furthermore, these subsystems are managed by two different teams, each with its own release cycles, service level agreements (SLA), and scalability requirements.

Lastly, a specialised notification service is in place to send alerts to customers.


Here is a refined description of the primary use case of the system:

  1. A customer submits a loan application.
  2. The Loan Application Service records the new application with a "Pending" status and initiates the assessment process by forwarding the application to the Assessment Service.
  3. The Assessment Service evaluates the incoming loan application and subsequently informs the Loan Application Service of the decision.
  4. Upon receiving the decision, the Loan Application Service updates the loan application status accordingly and triggers the Notifications Service to inform the customer of the outcome.
  5. The Notifications Service processes this request and dispatches notifications to the customer via email, SMS, or other preferred communication methods, according to the customer's settings.

It is a pretty simple and primitive system at first glance, but let’s dive into how the Loan application service processes the submit loan application command.


We can consider two approaches for service interactions:

  1. First-Local-Commit-Then-Publish: In this approach, the service updates its local database (commits) and then publishes an event or message to other services.

  2. First-Publish-Then-Local-Commit: Conversely, this method involves publishing an event or message before committing the changes to the local database.

Both methods have their drawbacks and are only partially fail-safe for communication in distributed systems.

This is a sequence diagram of applying the first approach.

In this scenario, the Loan Application Service employs the First-Local-Commit-Then-Publish approach, where it first commits a transaction and then attempts to send a notification to another system. However, this process is susceptible to failure if, for example, there are network issues, the Assessment Service is unavailable, or the Loan Application Service encounters an Out of Memory (OOM) error and crashes. In such cases, the message would be lost, leaving the Assessment without notice of the new loan application, unless additional measures are implemented.

And the second one.


In theFirst-Publish-Then-Local-Commit scenario, the Loan Application Service faces more significant risks. It might inform the Assessment Service about a new application but fail to save this update locally due to problems like database issues, memory errors, or code bugs. This approach can lead to significant inconsistencies in data, which could cause serious problems, depending on how the Loan Review Service handles incoming applications.


Therefore, we must identify a solution that offers a robust mechanism for publishing events to external consumers. But, before delving into potential solutions, we should first clarify the types of message delivery guarantees achievable in distributed systems.

Message delivery guarantees

There are four types of guarantees we could achieve.

  1. No guarantees
    There is no guarantee that the message will be delivered to the destination. The approachFirst-Local-Commit-Then-Publish is precisely about this. Consumers may receive messages once, multiple times, or never at all.

  2. At most once delivery
    At most once delivery means that the message will be delivered to the destination at most 1 time. The approachFirst-Local-Commit-Then-Publish can be implemented in this way as well with the retry policy of attempts with value one.

  3. At least once delivery\Consumers will receive and process every message but may receive the same message more than once.

  4. Exactly once delivery\Exactly once delivery means that consumer will receive the message effectively once.
    Technically, it’s possible to achieve with Kafka transactions and specific idempotent implementation of producer and consumer.

In most cases, 'at least once' delivery guarantees address many issues by ensuring messages are delivered at least once, but consumers must be idempotent. However, given the unavoidable network failures, all consumer logic must be idempotent to avoid processing duplicate messages, regardless of the producer's guarantees. Therefore, this requirement isn’t so much a drawback as it reflects reality.

Solutions

There are plenty of solutions to this problem, which have their advantages and disadvantages.

Two-phase commit

According to Wikipedia, the Two-Phase Commit (2PC) is a distributed transaction protocol used in computer science and database management systems to ensure the consistency and reliability of distributed transactions. It’s designed for situations where multiple resources (e.g., databases) need to participate in a single transaction, and it ensures that either all of them commit the transaction or all of them abort it, thereby maintaining data consistency. It sounds exactly what we need, but Two-Phase Commit has several drawbacks:

  • If one participating resource becomes unresponsive or experiences a failure, the entire process can be blocked until the issue is resolved. This can lead to potential performance and availability problems.
  • Two-Phase Commit doesn’t provide built-in fault tolerance mechanisms. It relies on external mechanisms or manual intervention to handle failures.
  • Not all modern databases support Two-Phase Commit.

Shared database

The most apparent solution for microservices architecture is to apply a pattern (or even sometimes anti-pattern) — a shared database. This approach is very intuitive if you need transactional consistency across multiple tables in different databases, just use one shared database for these microservices.


The drawbacks of this approach include introducing a single point of failure, inhibiting independent database scaling, and limiting the ability to use different database solutions best suited for specific requirements and use cases. Additionally, modifications to the microservices codebases would be necessary to support such a form of distributed transaction.

Transactional outbox

The 'transactional outbox' is a design pattern used in distributed systems to ensure reliable message propagation, even in the face of unreliable messaging systems. It involves storing events in a designated 'OutboxEvents' table within the same transaction as the operation itself. This approach aligns well with ACID properties of relational databases. In contrast, many No-SQL databases do not fully support ACID properties, opting instead for the principles of the CAP theorem and BASE philosophy, which prioritise availability and eventual consistency over strict consistency.

A transactional outbox provides at least once guarantee and can be implemented with several approaches:

  1. Transaction log tailing

  2. Polling publisher

Transaction log tailing approach implies using database-specific solutions like CDC (Change Data Capture). The main drawbacks of that approach are:

  • Database specific solutions

  • Increased latency due to specifics of CDC implementations

Another method is the Polling Publisher, which facilitates outbox offloading by polling the outbox table. The primary drawback of this approach is the potential for increased database load, which can lead to higher costs. Furthermore, not all No-SQL databases support efficient querying for specific document segments. Extracting entire documents can, therefore, result in performance degradation.

Here is a small sequence diagram explaining how it works.

Listen to yourself

The primary challenge with the Transactional Outbox pattern lies in its dependency on database ACID properties. It might be straightforward in typical OLTP databases but poses challenges in the NoSQL realm. To address this, a potential solution is to leverage the append log (for instance, Kafka) right from initiating request processing.

Instead of directly processing the 'submit loan application' command, we immediately send it to an internal Kafka topic and then return an 'accepted' result to the client. However, since it's highly likely that the command still needs to be processed, we cannot immediately inform the customer of the result. To manage this eventual consistency, we can employ techniques such as long polling, client-initiated polling, optimistic UI updates, or using WebSockets or Server-Sent Events for notifications. However, this is a distinct topic altogether, so let's return to our initial subject.

We sent the message on an internal Kafka topic. The Loan Application Service then consumes this message — the same command it received from the client — and begins processing. First, it executes some business logic; only after this logic is successfully executed and the results are persisted, it publishes new messages on a public Kafka topic.

Let us take a look at a bit of pseudo-code.

public async Task HandleAsync(SubmitLoanApplicationCommand command, ...)
{
  //First, process business logic
  var loanApplication = await _loanApplicationService.HandleCommandAsync(command, ...);
  //Then, send new events to public Kafka topic
  producer.Send(new LoanApplicationSubmittedEvent(loanApplication.Id));
  //Then, commit offset
  consumer.Commit();
}

What if the processing of the business logic fails? No worries, since the offset has not yet been committed, the message will be retried.

What if sending new events to Kafka fails? No worries, since the business logic is idempotent, it will not create a duplicate loan application. Instead, it will attempt to resend messages to the public Kafka topic.

What if messages are sent to Kafka, but the offset commit fails? No worries, since the business logic is idempotent, it won’t create a duplicate loan application. Instead, it will resend messages to the public Kafka topic and hope that the offset commit succeeds this time.

The main drawbacks of this approach include the added complexity associated with a new programming style, eventual consistency (since the client won’t immediately know the result), and the requirement for all business logic to be idempotent.

Event sourcing

What is event sourcing, and how could it be applied here? Event sourcing is a software architectural pattern used to model the state of a system by capturing all changes to its data as a series of immutable events. These events represent facts or state transitions and serve as the single source of truth for the system’s current state. So, technically, by implementing an event-sourcing system, we already have all events in EventStore, and this EventStore can be used by consumers as a single source of truth about what happened. There is no need for a specific database solution for tracking all changes or concerns about ordering, the only problem is sitting on read side since to be able to get actual state of entity is required to replay all events.

Conclusion

In this article, we reviewed several approaches for building reliable messaging in distributed systems. There are several recommendations we might consider while building systems with these characteristics

  1. Always develop idempotent consumers since network failure is unavoidable.
  2. Carefully use the First-Local-Commit-Then-Publish with a clear understanding of guarantee requirements.
  3. Never use the First-Publish-Then-Local-Commit approach since it may lead to severe data inconsistency in your system.
  4. If existing database choice decision very likely may change or technical strategy implies to select the best storage solution for the problem — don’t build shared libraries by binding to database solutions like CDC.
  5. Use the Transactional Outbox approach as a standard solution for achieving at least once guarantees.
  6. Consider using the Listen to yourself approach when No-SQL databases are leveraged.

Next time, we will look at a more practical example of implementing a Transactional Outbox. See

you!


Written by fairday | Hey, I am Alex, a dedicated Software Development Engineer with experience in the .NET environment and architecture
Published by HackerNoon on 2024/03/18