Original article available at https://habrahabr.ru/company/mailru/blog/341498/
Tarantool is not just a database, it’s an application server with a database on board, which makes it a tool of choice for quickly and easily implementing certain things people usually spend a lot of time on.
I decided to write this article after reading about a RabbitMQ-based user notification service.
Quite a lot of IT specialists face similar problems, and I’d like to share with you how I solve them.
A few years ago we had a client who wanted to receive real-time (with network-defined latency) notifications about certain events in a web application.
Let’s take a look at the requirements:
Given this incomplete information, let’s make a few assumptions and guesstimates. If this application is a chat with a million users constantly writing something and it takes a single user 10 seconds to create a message, then a million users generate 100,000 messages per second.
Here arises the first question: what do we need to deliver?
This is a classical dilemma of all queues, aggregators, and event servers: what’s a storage unit? So let’s introduce a couple of definitions:
An event may have some data associated with it:
Here’s an example: user 345 placed an order to transport cargo X from point A to point B and paid for it with a card Z and so on.
Let the information about who (event source) placed an order (fact of an event) constitute an event, then our data would contain the starting point, the type of cargo, and other information about the user, the order, and the cargo. In other words, an event is a structure that describes the fact of an event, whereas data is all the rest.
Distinction between an event and data is somewhat arbitrary, but still.
Here’s an example of an event:
User 345 placed order 12345. At shipment time, the order had the created status (this is superfluous information).
Below I’ll try to formulate some rule of thumb for making the right architectural decision:
When designing queues, event servers, or aggregators, servers should handle events, not data.
But let me reiterate that: the dividing line between an event and data is blurry: in the example above, an event contains a part of data (the status field).
Back to the task: we’re building an event server and therefore can estimate the traffic. Say, on average, an event is a JSON hash 4 to 10 elements long (that is 60–160 bytes of text). So, given a million users generating 100,000 messages per second, the event flow would be 6 to 16 MB/s. To handle all this traffic, a single-host network with a 200 Mb/s throughput should be enough.
Now let’s try to calculate how many resources it would take to deliver these events to a million users. Architectures may differ, but there always exist some common principles. If one message needs to be delivered to a million users, it’s highly likely a bug in the architecture (although it might be a legitimate use case). We need to come up with some average estimate. Let’s assume that a single message is delivered to 10 users: in a typical messenger, you don’t often have more than 10 friends online; in case of an order, there are rarely more than contractors, and so on.
If we do the math, it takes approximately 2 to 3 Gb of traffic to deliver notifications to our users. Hence another key consideration: this task can be solved on a single server with a 10 Gb network interface controller and about 10 GB of RAM (if we set the caching interval to 10 minutes). Now that we know this system can be implemented on just one server, let’s try to build a real scalable distributed system.
One of the fastest ways of storing cached data on disk is by using a write-ahead log (WAL): data goes to the RAM cache and is then written to the WAL. Since data is always only appended to the WAL, it’s possible to leverage almost 100% of disk write throughput this way. I won’t discuss the the downsides of WALs here, but let me say they’re well-suited for replication.
Tarantool DBMS has just such a WAL: it not only allows replicating data to another host, but also offers two-way asynchronous master-master replication. Tarantool benchmarks run on a 2012 modest-spec laptop with 220-byte messages yield the performance of 160–180K writes per second, which is almost twice more than our task demands.
There are numerous ways of data delivery (we’ll discuss them later). For now, let’s focus on the algorithmic side of the delivery problem.
For data delivery to function properly in the real world, we’ll list several requirements it must meet:
Based on these requirements, through trial and error, we’ve arrived at the following scheme:
Given this algorithm, we have two candidates for organizing data transport: regular long-polling (each request carries the number of the last received message) or web sockets (the number of the last received message is sent only on reconnect).
After some experimentation, we decided that each event should be uniquely identified by an event key, which, in the general case, is just a string ID of an event. Since event keys are often a combination of several different IDs, let each event key be represented by some array of string IDs.
A quick example: user 123 writes a message to chat 345 and mentions user 567. As a result, we generate an event with a key [ ‘chat’, 345 ], which is delivered to all users in chat 345, and another event with a key [‘user’, 567], which only user 567 receives.
A more detailed view of these events may look like this:
“key”: [ “chat”, 345 ],
“key”: [ “user”, 567 ],
Now we’ve come to how message keys are formed.
It doesn’t make much sense (and may even do more harm than good sometimes) to have lots of message keys for more or less similar things.
New keys should only be allocated for significantly different entities.
Example 1: it makes sense to use a single key for these tasks:
Example 2: it makes sense to use different keys for these tasks:
That is to say, a message key should approximately define message recipients and be treated similarly to an address on an envelope.
Now that we’ve decided on the data/event scheme, it’s time to dive deeper into the implementation details.
We’ll store messages in a flat table with the following columns:
We’ll also need two indices for this table:
The resulting schema is available here.
Tarantool DBMS lets us easily create pub/sub applications with a built-in fiber module.
Each client request is processed in a separate fiber (a lightweight type of thread). This paradigm makes it a breeze to handle tens of thousands of connections on a single CPU core. That said:
The subscribe algorithm for one key looks like this:
The push algorithm is as follows:
All the server logic, including scaling out to several CPUs/servers, was implemented in less than 500 lines of Lua code.
This system, based on three Tarantool instances that run on a single virtual (OpenVZ node) server, utilizes 10% of each of the two allocated CPU cores and currently serves about 50,000 users.
If the calculations are correct, one such server is capable of handling around 500,000 users. One or two more CPU cores might be needed, though.
Problems associated with the number of sockets per host that are mentioned in the article above are resolved the same way.
Each master instance has a background daemon that removes old messages from the database. The algorithm is pretty straightforward:
We started building this system at the time of Tarantool 1.5’s release, which didn’t have two-way asynchronous replication. That’s why its architecture looks like this:
Master and replicas are fully identical instances, except that (for now) messages are pushed only to one server.
That is to say, the system is currently scaled by adding new replicas, and the peak performance is capped by the performance of the master server (for a modern server unit, it’s about 400–500K messages per second).
Since Tarantool 1.6 has two-way master-master replication, this feature can be used for scaling the system as well. What I have in mind (yet to be implemented):
Other than that, the algorithm stays the same. This way the system can be scaled out to 10–30 master servers (which translates to 4–20 million outgoing messages per second) without making any substantial changes to the architecture.
It includes a simple asynchronous HTTP server for long polling or a more complex asynchronous server for web sockets + long polling. It takes about 200 lines of code to implement this server in Perl + AnyEvent.
We don’t use client authorization on the event server (in this context, a client is a web site user), because we don’t think it’s necessary.
But, generally speaking, it’s easy to implement: add to each message a key-value pair that defines “who is allowed to see this data” and compare this pair to the information contained in request cookies.
Since our server operates on events and not data, we didn’t need to introduce authorization so far.
We started working in this area back when web sockets only started to emerge and the first standards were being drafted. For this reason, regular HTTP long polling has been used for data transport for a very long time. Experimenting with web sockets in mobile networks has shown that:
That’s why, when talking about interactive web sites in the context of mobile networks, you can say web sockets are gaining traction (supported by 70% of devices), but they still have a long way to go until universal adoption (30% is quite a lot).
If a web project includes a queue and an event server, many things that tend to cause problems in other architectures can easily be dealt with.
One such thing, and quite obvious at that, is a chat described above. Another elegant application of an event server is working with long-running or resource-intensive algorithms (report generation, statistics gathering, and even video decoding).