Designing a data pipeline comes with its own set of problems. Take lambda architecture for example. In the batch layer, if data somewhere in the past is incorrect, you’d have to run the computation function on the whole (possibly terabytes large) dataset, the result of which would be absorbed in serving layer and are reflected.
In the speed layer alone, we have the problem of data latency, data lag, data correctness, data completeness, and out of order data. Data might come later than expected, there might be an unusual delay from one stage of pipeline to the other stage of the pipeline (lag), it might be incomplete, some data might have gone missing or lost somewhere in the pipeline (completeness). The data might have been corrupted at the source or at a particular place in the pipeline where a computation function is applied to it (correctness), or data might not come in order, some packets arriving later than usual (out of order).
The purpose of this post is to illustrate how to deal with data completeness, data lag and out of order data using a dictionary with expiring keys with a TTL attached to them, called dictttl. The github repo can be found here.
The problem statement at hand is to make sure the data quality in the speed layer is good. This is done by tackling data lag, correctness, latency, completeness and out of order data. In this post i will be working with data completeness, lag and out of order data problems.
Sometimes data lag is too high, either because of plumbing of the pipeline, or simply because sometimes we have too much data in a particular window for a key, so running the computation function on it takes longer than usual.
At times, computation functions drop some data, or all the packets are not picked up at the source, which causes data completeness problem.
And finally, streaming tools such as kafka or sensors do not always send data in order. It is important to handle those scenarios as well.
The consequence of:
Now, my current, really simplified version of data pipeline looks like this
Streaming Tool > Big data tool > Storage tool
For example, data would be arriving at kafka from a sensor, from there it will go to spark for aggregation and from there it will be stored in MongoDB.
I can put a check at the big data tool stage to make sure i handle the mentioned problems.
To handle lag, I can wait for say, n units of time to make sure all of them arrive. If they do not arrive within that time, I can 1) choose to discard them entirely or 2) log them that they arrived late, process them and update serving layer with the new results. In this post, I will choose to discard them. Also, these packets are called straggler packets.
To handle the out of order problem, I can 1) choose to discard them entirely, or 2) take them in to account in any next window that they come in, process them and update my serving view. In this post, I will choose to discard them.
To handle data completeness problem, I can use a metric based on the value of the sensor. For example, I can say that the sum of all the values for a sensor in a particular window should be at least m. If it is less than that, I would discard it.
With this logic in mind, let’s design the simplified version of the pipeline.
To design the pipeline, I will be using Kafka and Pandas. Kafka for streaming the data, and pandas for aggregating and quality checks. I have chosen not to store them in any database or flat file.
To handle out of order and lag situations, I will wait for 5 seconds till all the data from the sensor arrives. If it does not, I will discard it and move on.
To handle completeness problem, I will sum all the values for a sensor and say that it should be at least 7000. If it is less than that, I will discard that row (and use batch layer to compute the speed view).
I will send data from a producer to a consumer, producer will send data regularly, except at two intervals. At one point it will send a custom value of 1 to demonstrate completeness problem, and at another point it will sleep for 7 seconds to demonstrate out of order and lag problem.
The consumer will consume the data, by creating a tumbling window. I create a window of 5 seconds that will tumble from one to the another, and within a window I will collect all data points for a sensor, aggregate them (mean), and print them.
I use a python library called dictttl for the purpose of handling lag and out of order data problems. In a nutshell, this is a dictionary that has keys that expire after a manually set time.
To generate data, I have used the following code, it can also be found at github.
It is basically a command line program that taken in an input sensor and range(value_start, value_end), and it generates data based on those parameters, and sends to the topic. Also, sensor id is the same, sensor_1, and I am using defaultDict(list) variant of the dictttl, so the values would be appended to a list for a particular window, like this:
{ key : (timestamp, [ data ])} e,g
{‘sensor_1’ : (16000000000, [1,2,4,4,2,4,5,5,6,3,1,2,3,4]}
The lines:
Makes sure that once a reaches 10000, it sends data value of 1, so that the consumer can check the sum of elements and discard it. It will be shown later.
The lines:
Makes sure that once a reaches 20000, it sends data but then halts for 7 seconds. This would be useful later.
The consumer consumes the data and checks if the timestamp of the data is less than the starting point of the window. If it is less, it discards it, partially solving the out of order and late data problem (which could have arisen from the packets belonging to data from previous window). If it is in the window, it appends it to the expiring dict, which has a time to live of 5 seconds.
The lines:
Checks if the data is out of the window. First, it checks if the expiring dictionary is empty. If it is, it creates the next window and continues. The dictionary could be empty because it might have expired, as it happens because at one point in time the producer halted for 7 seconds, and the threshold for discarding data was 5 seconds.
If it is not empty, it creates a data frame from the dictionary, and the following lines will check if the sum of all the values for the sensor are more than 7000. If not, i will print “drop the row”.
The following line:
Will make an aggregation and prints it.
The result looks like the following:
Note that the first 3 timestamps are sequential in order. At the fourth entry, length of dict is 0 because the dictionary expired, and the next timestamp has a difference of 5 seconds as compared to the previous timestamp. Also, the sum of values is less than 7000 in the fourth entry so it is dropped. Next three rows are again sequential.
In this post, I went through what kind of problems arise when dealing with streaming data, and how to tackle them. I used a library called dictttl which has expiring keys, and in those situations where we choose to discard data they are really useful, as we don’t have to manually search and delete those items.
As seen above, we handled out of order and lag problem by creating a buffer to absorb data within a threshold and if any data arrives after the threshold, it is discarded. Again, this is a very simple example, and it assumes mostly ordered data. If a packet arrives ahead of time it will simply create the new window. But in cases where out of order problem is high, we can create a buffer for that too, we can say after 1% of out of order cases we will discard the values.
We also saw how to handle data completeness problem, by taking sum of the values. If the sum is less than the threshold, we simply delete those items.
I hope you enjoyed my post.