Building a reliable, highly available, scalable distributed system requires adherence to specific techniques, principles, and patterns. The design of such systems involves addressing a myriad of challenges. Among the most prevalent and foundational issues is the dual write problem.
The “dual write problem” is a challenge that arises in distributed systems, mainly when dealing with multiple data sources or databases that need to be kept in sync. It refers to the difficulty of ensuring that data changes are consistently written to various data stores, such as databases or caches, without introducing issues like data inconsistencies, conflicts, or performance bottlenecks.
The microservices architecture and pattern database per service brings you many benefits, such as independent deployment and scaling, isolated failures, and a potential boost of development velocity. However, operations require changes among multiple microservices, forcing you to think about a reliable solution to tackle this problem.
Let's consider a scenario in which our domain involves accepting loan applications, assessing them, and then sending notification alerts to customers.
In the spirit of the single responsibility principle, Conway’s law, and domain-driven design approach, after several event-storming sessions, the whole domain was split into three subdomains with defined bounded contexts having clear boundaries, domain models, and ubiquitous language.
The first is tasked with onboarding and compiling new loan applications. The second system evaluates these applications and makes decisions based on the data provided. This assessment process, including KYC/KYB, antifraud, and credit risk checks, can be time-consuming, necessitating the ability to handle thousands of applications simultaneously. Consequently, this functionality has been delegated to a dedicated microservice with its own database, allowing for independent scaling.
Furthermore, these subsystems are managed by two different teams, each with its own release cycles, service level agreements (SLA), and scalability requirements.
Lastly, a specialised notification service is in place to send alerts to customers.
Here is a refined description of the primary use case of the system:
It is a pretty simple and primitive system at first glance, but let’s dive into how the Loan application service processes the submit loan application command.
We can consider two approaches for service interactions:
Both methods have their drawbacks and are only partially fail-safe for communication in distributed systems.
This is a sequence diagram of applying the first approach.
In this scenario, the Loan Application Service employs the First-Local-Commit-Then-Publish approach, where it first commits a transaction and then attempts to send a notification to another system. However, this process is susceptible to failure if, for example, there are network issues, the Assessment Service is unavailable, or the Loan Application Service encounters an Out of Memory (OOM) error and crashes. In such cases, the message would be lost, leaving the Assessment without notice of the new loan application, unless additional measures are implemented.
And the second one.
In the First-Publish-Then-Local-Commit scenario, the Loan Application Service faces more significant risks. It might inform the Assessment Service about a new application but fail to save this update locally due to problems like database issues, memory errors, or code bugs. This approach can lead to significant inconsistencies in data, which could cause serious problems, depending on how the Loan Review Service handles incoming applications.
Therefore, we must identify a solution that offers a robust mechanism for publishing events to external consumers. But, before delving into potential solutions, we should first clarify the types of message delivery guarantees achievable in distributed systems.
There are four types of guarantees we could achieve.
In most cases, 'at least once' delivery guarantees address many issues by ensuring messages are delivered at least once, but consumers must be idempotent. However, given the unavoidable network failures, all consumer logic must be idempotent to avoid processing duplicate messages, regardless of the producer's guarantees. Therefore, this requirement isn’t so much a drawback as it reflects reality.
There are plenty of solutions to this problem, which have their advantages and disadvantages.
According to the description in the Wikipedia
Two-Phase Commit (2PC) is a distributed transaction protocol used in computer science and database management systems to ensure the consistency and reliability of distributed transactions. It’s designed for situations where multiple resources (e.g., databases) need to participate in a single transaction, and it ensures that either all of them commit the transaction or all of them abort it, thereby maintaining data consistency.
It sounds exactly what we need, but Two-Phase Commit has several drawbacks:
The most apparent solution for microservices architecture is to apply a pattern (or even sometimes anti-pattern) — a shared database. This approach is very intuitive if you need transactional consistency across multiple tables in different databases, just use one shared database for these microservices.
The drawbacks of this approach include introducing a single point of failure, inhibiting independent database scaling, and limiting the ability to use different database solutions best suited for specific requirements and use cases. Additionally, modifications to the microservices codebases would be necessary to support such a form of distributed transaction.
The 'transactional outbox' is a design pattern used in distributed systems to ensure reliable message propagation, even in the face of unreliable messaging systems. It involves storing events in a designated 'OutboxEvents' table within the same transaction as the operation itself. This approach aligns well with ACID properties of relational databases. In contrast, many No-SQL databases do not fully support ACID properties, opting instead for the principles of the CAP theorem and BASE philosophy, which prioritise availability and eventual consistency over strict consistency.
A transactional outbox provides at least once guarantee and can be implemented with several approaches:
Transaction log tailing approach implies using database-specific solutions like CDC (Change Data Capture). The main drawbacks of that approach are:
Another method is the Polling Publisher, which facilitates outbox offloading by polling the outbox table. The primary drawback of this approach is the potential for increased database load, which can lead to higher costs. Furthermore, not all No-SQL databases support efficient querying for specific document segments. Extracting entire documents can, therefore, result in performance degradation.
Here is a small sequence diagram explaining how it works.
The primary challenge with the Transactional Outbox pattern lies in its dependency on database ACID properties. It might be straightforward in typical OLTP databases but poses challenges in the NoSQL realm. To address this, a potential solution is to leverage the append log (for instance, Kafka) right from initiating request processing.
Instead of directly processing the 'submit loan application' command, we immediately send it to an internal Kafka topic and then return an 'accepted' result to the client. However, since it's highly likely that the command still needs to be processed, we cannot immediately inform the customer of the result. To manage this eventual consistency, we can employ techniques such as long polling, client-initiated polling, optimistic UI updates, or using WebSockets or Server-Sent Events for notifications. However, this is a distinct topic altogether, so let's return to our initial subject.
We sent the message on an internal Kafka topic. The Loan Application Service then consumes this message — the same command it received from the client — and begins processing. First, it executes some business logic; only after this logic is successfully executed and the results are persisted, it publishes new messages on a public Kafka topic.
Let us take a look at a bit of pseudo-code.
public async Task HandleAsync(SubmitLoanApplicationCommand command, ...)
{
//First, process business logic
var loanApplication = await _loanApplicationService.HandleCommandAsync(command, ...);
//Then, send new events to public Kafka topic
producer.Send(new LoanApplicationSubmittedEvent(loanApplication.Id));
//Then,
commit offset consumer.Commit();
}
What if the processing of the business logic fails? No worries, since the offset has not yet been committed, the message will be retried.
What if sending new events to Kafka fails? No worries, since the business logic is idempotent, it will not create a duplicate loan application. Instead, it will attempt to resend messages to the public Kafka topic.
What if messages are sent to Kafka, but the offset commit fails? No worries, since the business logic is idempotent, it won’t create a duplicate loan application. Instead, it will resend messages to the public Kafka topic and hope that the offset commit succeeds this time.
The main drawbacks of this approach include the added complexity associated with a new programming style, eventual consistency (since the client won’t immediately know the result), and the requirement for all business logic to be idempotent.
What is event sourcing, and how could it be applied here? Event sourcing is a software architectural pattern used to model the state of a system by capturing all changes to its data as a series of immutable events. These events represent facts or state transitions and serve as the single source of truth for the system’s current state. So, technically, by implementing an event-sourcing system, we already have all events in EventStore, and this EventStore can be used by consumers as a single source of truth about what happened. There is no need for a specific database solution for tracking all changes or concerns about ordering, the only problem is sitting on read side since to be able to get actual state of entity is required to replay all events.
In this article, we reviewed several approaches for building reliable messaging in distributed systems. There are several recommendations we might consider while building systems with these characteristics
Next time, we will look at a more practical example of implementing a Transactional Outbox. See
you!