In this part i would be talking about the serving layer of the Lambda Architecture. Serving layer is derived either by performing computation on batch data to arrive at a view that is mid way from speed layer and batch layer, or by collecting enough data for a window (whose size varies from application to application) from the speed layer and performing computations on that. Example queries are, in our case, answering questions that pertain to a single day. In this post i will talk about how that, by aggregating data for a smaller interval (than the range of the whole dataset, but bigger than the 5 minute window that we had in speed layer) by presenting some visualizations. You can find the code for the article here.
The main requirement satisfied by the serving layer is that it allows for arbitrary views to be computed with low latency. It allows for data to be indexed and privides interface to access the precomputed data.
Serving layer is a database that allows for batch updates and random reads, but not random writes. When new batches of data are available from speed layer or batch layer, they are inserted (or upserted) into the database. Serving layer satisfies several requirements for the Lambda Architecture
Serving layer is distributed across machines for scalability and fault tolerance, and indexes are created, stored and served in a distributed manner. When designing indexes, two main things are to be thought of, throughput and latency. Latency is the time required to answer a single query, and throughput is the number of queries that can be answered in a time frame.
The requirements of a serving layer are
With these things in mind, the design that i will show later in the article is somewhat similar. I collect data from the streaming dataset, perform computations on it and store it in a NoSQL database. The database is fault tolerant and scalable, and serves low latency results to the queries. After that, i can also visualize the data to answer some questions, the granularity of which lies between those of batch layer and speed layer (a day to a month).
The dataset has been obtained from academictorrents, and consists of trips made by taxis in New York, including the medallion number, pickup and dropoff locations, distance travelled, and time of the trip taken in seconds. It contains several more columns, but only these are relevant to us at the moment. An example is shown below
medallion pickup_datetime dropoff_datetime trip_time_in_secs trip_distance pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude
740BD5BE61840BE4FE3905CC3EBE3E7E, 2013–10–01 12:44:29, 2013–10–01 12:53:26, 536, 1.2, -73.974319, 40.741859, -73.99115, 40.742424
Based on this data, the goal of the task at hand is to determine
The tech stack used is Spark, Python, Elasticsearch and Kibana. Although we have a static file, i simulated a streaming dataset using kafka. That data is then aggregated in Spark, written to Kafka again, then indexed in Elasticsearch, and finally visualized in Kibana. Also, for the purpose of demonstration, streaming the whole dataset is not required, just one month worth of data.
Also, i went for Spark structured streaming instead of Dstreams or microbatches. And in my opinion, it has not matured as much as Kafka, there are not as many functions in Spark as there are in Kafka. Also, in Kafka you just get a row of data and you can offload the rest of the processing to python, for example with the integration with sinks, like NoSQL databases or flat files. In spark, you need to have connectors to other sinks, of which not a lot of are available. You can use foreach or foreachbatch but they are not the standard way to sink data. The version i am using is 2.4.6.
Similar to what i did in speed layer, i first produce the data to a kafka topic. I simulate a streaming dataset by pushing data a row at a time, and i push all the rows instead of only of a couple thousand. The code to do that is as follows
After data has been produced to a kafka topic, i then consume the data from that topic, and perform the aggregation. In Spark structured streaming, you can specify the window size, and Spark automatically creates a sliding window that big, and then aggregates data within batches of that sliding window. In our case, the window size is 24 hours.
Also, there is an option of watermarking, which allows Spark to handle late data. If, for example, a watermark has been specified for 5 minutes, it means that the Spark aggregation window will keep in memory date that belongs to 5 minutes prior to the current window, and update the aggregation that belongs to that previous window once it sees any late data (that falls within the 5 minute watermark).
Also, there are three output modes, two of which are Complete mode and Append mode. Output in Spark Structured streaming is shown in batches, Complete mode outputs the complete output at every batch, and Append mode appends the fresh output at every batch. There is one caveat. In append mode, the aggregation only starts when the watermark duration is over, and if the data is disordered (as was in my case, there was no order at all), it means the watermark duration has to be really high for every window to be aggregated, which means high processing time.
An alternate is to use Complete mode, so that we may get the aggregation, at the cost of saving duplicate entries in the output, which is what i did. I aggregated the data in 24 hour windows, calculated sum and average of trip time, trip distance and passenger count, and I saved the output to a Kakfa topic. The code is as follows.
After the aggregated data has been saved to a kafka topic, i then consume data from that topic, save it to a file, read from it and save it to elasticsearch. The code that does it is as follows
To avoid adding duplicate entries to ElasticSearch, i keep the id as medallion number + time_start, which is unique for every document. That way i only have unique documents. I also could have removed duplicates from file, and then added entries to database. Now, we can visualize the data.
Coming back to the goals that were outlined earlier, we now start answering questions.
a) How many cabs operate on a single day?
Kibana told us that on a day, there were 7500 cabs operating (on 28th Jan).
b) What is the most frequent average trip time for all cabs?
From the figure, we can see that most frequent average trip time is between 8–12 minutes, and highest is around 1.5 hours. So, most of the cabs drive only a short distance in a day.
c) What is the most frequent total trip time for all cabs?
As we can see from the image, most frequent total time is actually close to 0, which means most cabs don’t drive at all, or drive really small distances which take about 10 minutes . Next, total trip time is around 1 and a half hour.
d) What is the most frequent average trip distance for all cabs?
We can see that the most frequent average trip distance is 1.5 to 2.5 miles, which is consistent with average trip time of 10 minutes, not a long distance travalled by most cabs.
e) What is the most frequent total trip time for all cabs?
The most frequent total trip time is 0–5 miles, which is consistent with total trip time of 0–10 minutes. Next, we have a value of 90 miles, which is consistent with total trip time of 1.5 hours. Again, we see that most cabs drive a short distance within the city.
f) What is the most frequent average passenger count for all cabs?
As we can see from the image, most frequent average passenger count is 1, followed by 2 and 5. That means majority of cabs drive only 1 passenger throughout the day.
g) What is the most frequent total passenger count for all cabs?
Most frequent sum of passenger count for all cabs is 1, followed 25–35. It means most cabs drive a grand total of 1 passenger throughout the day, followed by 25 -35 passengers.
h) Maximum amount of distance driven by any cab driver in a day is 214 miles, and minimum is 0.
i) Maximum time spent driving in a day by any driver is 12.6 hours, and minimum is 0.
j) Maximum number of passengers chauffered by any driver in a day is 426 passengers, and minimum is 0.
In this article we went through what a serving layer is, how it fulfils the requirement of a data system which is distributed, fault tolerant, and serves sub-latency queries. We then saw how do we implement a serving layer based on data from speed layer (instead of batch layer which gets updates on the master dataset). We covered how Spark handles windowing and checkpointing, then we saw a real world example of what is the scope of the serving layer queries, and how do we answer them by inserting data into elasticsearch and kibana.
We found that most cabs drive a short distance throughout the day, which takes about 10 minutes on average, and they drive only 1 passenger. The data can be inserted in another NoSQL database like Cassandra or MongoDB, where views are of 24 hour interval, similar to how we saw in this post. That data can then be utilized by a machine learning model to do some predictive analysis as well, where the window duration can vary from application to application.
Read about Speed layer here
And about Batch layer here
I hope you liked reading my post.
Create your free account to unlock your custom reading experience.