Vadim Popov

@Vadim.Popov

Tarantool: when it takes 500 lines of code to notify a million users

Original article available at https://habrahabr.ru/company/mailru/blog/341498/

Tarantool is not just a database, it’s an application server with a database on board, which makes it a tool of choice for quickly and easily implementing certain things people usually spend a lot of time on.

I decided to write this article after reading about a RabbitMQ-based user notification service.

Quite a lot of IT specialists face similar problems, and I’d like to share with you how I solve them.

A few years ago we had a client who wanted to receive real-time (with network-defined latency) notifications about certain events in a web application.

Let’s take a look at the requirements:

  • On average, we need to listen for 10 events on the page.
  • There are a million users.
  • Persistence is a must (we don’t want to lose events).

Given this incomplete information, let’s make a few assumptions and guesstimates. If this application is a chat with a million users constantly writing something and it takes a single user 10 seconds to create a message, then a million users generate 100,000 messages per second.

Here arises the first question: what do we need to deliver?

Events or data?

This is a classical dilemma of all queues, aggregators, and event servers: what’s a storage unit? So let’s introduce a couple of definitions:

  • Event is a minimal informational structure that stores information about the fact of an event.

An event may have some data associated with it:

  • Data is a full data set, including data weakly relevant to the event.

Here’s an example: user 345 placed an order to transport cargo X from point A to point B and paid for it with a card Z and so on.

Let the information about who (event source) placed an order (fact of an event) constitute an event, then our data would contain the starting point, the type of cargo, and other information about the user, the order, and the cargo. In other words, an event is a structure that describes the fact of an event, whereas data is all the rest.

Distinction between an event and data is somewhat arbitrary, but still.

Here’s an example of an event:

{
“type”: “order”,
“user”: 345,
“status”: “created”,
“orderid”: 12345
}

User 345 placed order 12345. At shipment time, the order had the created status (this is superfluous information).

Below I’ll try to formulate some rule of thumb for making the right architectural decision:

When designing queues, event servers, or aggregators, servers should handle events, not data.

But let me reiterate that: the dividing line between an event and data is blurry: in the example above, an event contains a part of data (the status field).

Back to the task: we’re building an event server and therefore can estimate the traffic. Say, on average, an event is a JSON hash 4 to 10 elements long (that is 60–160 bytes of text). So, given a million users generating 100,000 messages per second, the event flow would be 6 to 16 MB/s. To handle all this traffic, a single-host network with a 200 Mb/s throughput should be enough.

Now let’s try to calculate how many resources it would take to deliver these events to a million users. Architectures may differ, but there always exist some common principles. If one message needs to be delivered to a million users, it’s highly likely a bug in the architecture (although it might be a legitimate use case). We need to come up with some average estimate. Let’s assume that a single message is delivered to 10 users: in a typical messenger, you don’t often have more than 10 friends online; in case of an order, there are rarely more than contractors, and so on.

If we do the math, it takes approximately 2 to 3 Gb of traffic to deliver notifications to our users. Hence another key consideration: this task can be solved on a single server with a 10 Gb network interface controller and about 10 GB of RAM (if we set the caching interval to 10 minutes). Now that we know this system can be implemented on just one server, let’s try to build a real scalable distributed system.

Storing data

One of the fastest ways of storing cached data on disk is by using a write-ahead log (WAL): data goes to the RAM cache and is then written to the WAL. Since data is always only appended to the WAL, it’s possible to leverage almost 100% of disk write throughput this way. I won’t discuss the the downsides of WALs here, but let me say they’re well-suited for replication.

Tarantool DBMS has just such a WAL: it not only allows replicating data to another host, but also offers two-way asynchronous master-master replication. Tarantool benchmarks run on a 2012 modest-spec laptop with 220-byte messages yield the performance of 160–180K writes per second, which is almost twice more than our task demands.

Delivering data to a client

There are numerous ways of data delivery (we’ll discuss them later). For now, let’s focus on the algorithmic side of the delivery problem.

For data delivery to function properly in the real world, we’ll list several requirements it must meet:

  • It must be disconnection-tolerant (messages must not be lost when reconnecting after a disconnect).
  • The client must be thin (most of the logic should ideally be implemented on the server, not on the client).

Based on these requirements, through trial and error, we’ve arrived at the following scheme:

  • The client “remembers” (no need for persistence here) the number of the last received message.
  • This number is used when restoring the event server connection.

Given this algorithm, we have two candidates for organizing data transport: regular long-polling (each request carries the number of the last received message) or web sockets (the number of the last received message is sent only on reconnect).

Data/event scheme

After some experimentation, we decided that each event should be uniquely identified by an event key, which, in the general case, is just a string ID of an event. Since event keys are often a combination of several different IDs, let each event key be represented by some array of string IDs.

A quick example: user 123 writes a message to chat 345 and mentions user 567. As a result, we generate an event with a key [ ‘chat’, 345 ], which is delivered to all users in chat 345, and another event with a key [‘user’, 567], which only user 567 receives.

A more detailed view of these events may look like this:

{
“key”: [ “chat”, 345 ],
“data”: {
“type”: “new_message”,
“msgid”: 9876
}
}

and

{
“key”: [ “user”, 567 ],
“data”: {
“type”: “notice”,
“chatid”: 345,
“msgid”: 9876
}
}

Now we’ve come to how message keys are formed.

It doesn’t make much sense (and may even do more harm than good sometimes) to have lots of message keys for more or less similar things.

New keys should only be allocated for significantly different entities.

Example 1: it makes sense to use a single key for these tasks:

  • notifying all chat users about a new message;
  • notifying all chat users about a newly connected user.

Example 2: it makes sense to use different keys for these tasks:

  • notifying all chat users about a new message;
  • notifying a user of a chat X about a mention.

That is to say, a message key should approximately define message recipients and be treated similarly to an address on an envelope.

Implementation

Now that we’ve decided on the data/event scheme, it’s time to dive deeper into the implementation details.

We’ll store messages in a flat table with the following columns:

  • message ID (ever-incrementing sequence number);
  • time of event generation;
  • message key;
  • message data.

We’ll also need two indices for this table:

  • primary index on message ID. This index will be used for deleting old messages from the database;
  • secondary index for selecting messages by key (composite index consisting of a message key and a message ID).

The resulting schema is available here.

Tarantool DBMS lets us easily create pub/sub applications with a built-in fiber module.

Each client request is processed in a separate fiber (a lightweight type of thread). This paradigm makes it a breeze to handle tens of thousands of connections on a single CPU core. That said:

  • your code is not riddled with callbacks (which is the case with Node.js);
  • the C10k problem gets easily resolved.

The subscribe algorithm for one key looks like this:

  1. Check if there’s any data (new events) associated with a given key. If so, return the data.
  2. Add the current fiber to the list of fibers subscribed to the given key.
  3. Sleep for some time (so that mobile networks don’t disrupt the functioning of web sockets; 25 seconds was empirically found to work well).
  4. Respond to a user (response may be empty, more on that later).

The push algorithm is as follows:

  1. Write new event to the database.
  2. If there are any clients subscribed to the keys that have been added to the database, wake up their corresponding fiber(s).

All the server logic, including scaling out to several CPUs/servers, was implemented in less than 500 lines of Lua code.

This system, based on three Tarantool instances that run on a single virtual (OpenVZ node) server, utilizes 10% of each of the two allocated CPU cores and currently serves about 50,000 users.

If the calculations are correct, one such server is capable of handling around 500,000 users. One or two more CPU cores might be needed, though.

Problems associated with the number of sockets per host that are mentioned in the article above are resolved the same way.

Removing old messages

Each master instance has a background daemon that removes old messages from the database. The algorithm is pretty straightforward:

  1. Choose the oldest message (the one with the lowest ID).
  2. Check its creation time.
  3. If its lifetime hasn’t expired, wait the necessary amount of time.
  4. Delete the old message.
  5. Get back to step 1.

Scaling

We started building this system at the time of Tarantool 1.5’s release, which didn’t have two-way asynchronous replication. That’s why its architecture looks like this:

  • Master server (messages can be pushed to it);
  • Replicas (clients can connect to them).

Master and replicas are fully identical instances, except that (for now) messages are pushed only to one server.

That is to say, the system is currently scaled by adding new replicas, and the peak performance is capped by the performance of the master server (for a modern server unit, it’s about 400–500K messages per second).

Update plans

Since Tarantool 1.6 has two-way master-master replication, this feature can be used for scaling the system as well. What I have in mind (yet to be implemented):

  • Convert a message ID to an array: master server ID, message ID.
  • Between two reconnects, a client “remembers” this array of values.
  • PROFIT!

Other than that, the algorithm stays the same. This way the system can be scaled out to 10–30 master servers (which translates to 4–20 million outgoing messages per second) without making any substantial changes to the architecture.

Downsides (there always are some)

  1. Lua (greatest downside). It’s simple, but its limitations (1 GB of RAM per instance) force you to scale a bit earlier than you would otherwise have.
  2. Unfortunately, we haven’t made public the HTTP part that handles data transport (because it depends on our internal-only modules).

It includes a simple asynchronous HTTP server for long polling or a more complex asynchronous server for web sockets + long polling. It takes about 200 lines of code to implement this server in Perl + AnyEvent.

Client authorization

We don’t use client authorization on the event server (in this context, a client is a web site user), because we don’t think it’s necessary.

But, generally speaking, it’s easy to implement: add to each message a key-value pair that defines “who is allowed to see this data” and compare this pair to the information contained in request cookies.

Since our server operates on events and not data, we didn’t need to introduce authorization so far.

A few words about data transport

We started working in this area back when web sockets only started to emerge and the first standards were being drafted. For this reason, regular HTTP long polling has been used for data transport for a very long time. Experimenting with web sockets in mobile networks has shown that:

  1. An event (even an empty one) must be passed through a web socket once or twice a minute. Otherwise, mobile networks shut down the connection.
  2. There’s no difference between a web socket and long polling + keep-alive in this case.
  3. There are lots of mobile devices where browsers don’t support web sockets.

That’s why, when talking about interactive web sites in the context of mobile networks, you can say web sockets are gaining traction (supported by 70% of devices), but they still have a long way to go until universal adoption (30% is quite a lot).

Applications

If a web project includes a queue and an event server, many things that tend to cause problems in other architectures can easily be dealt with.

One such thing, and quite obvious at that, is a chat described above. Another elegant application of an event server is working with long-running or resource-intensive algorithms (report generation, statistics gathering, and even video decoding).

Suppose we want to decode users’ videos. It goes without saying that it’s a lengthy procedure and not suitable for an HTTP request handler. After uploading a video, a user wants to understand what’s going on. We’ll just enqueue a video decoding task so that the client-side JavaScript starts tracking events associated with it. The video conversion process will send out events as the task gradually nears completion. Based on these events, the user’s browser can display a neat and, what’s more important, accurate progress bar.

More by Vadim Popov

Topics of interest

More Related Stories