by Miyuru Dayarathna and Srinath Perera
Stream Processing enables users to query continuous data streams and detect conditions fast within a small time period measured from the reception time. The detection time period may vary from few milliseconds to minutes. For example, with stream processing, you can receive an alert by querying a data streams coming from a temperature sensor and by detecting when the temperature has reached the freezing point. ( If you are new to the topic, please read A Gentle Introduction to Stream Processing. )
If you want to build an App that handles streaming data and takes real-time decisions, you have two choices: use a tool or build it yourself. The answer depends on the complexity of your use case, scalability requirements, and reliability and fault tolerance requirements.
If you want to build the App yourself, first place events in a message broker topic (e.g., ActiveMQ, RabbitMQ, or Kafka), then write code to receive events from topics in the broker ( they become your stream) and finally publish results back to the broker. A code that does the above three steps is called an actor in the stream processing world.
However, Instead of coding the above scenario from scratch, you save time by using a stream processing framework. An event stream processor lets you write logic using Streaming SQL and takes care of the rest.
You can either send events directly to the stream processor or send them via a broker. An event stream processor will do the hard work by collecting data, delivering it to each actor, making sure they run in the right order, collecting results, scaling if the load is high, and handling failures.
Next question is, “which stream processor you should use?.” I have both good and bad news. Good news is there is so many( See Quora Question: What are the best stream processing solutions out there?), so you have a lot of choices. The bad news is there are many, and likely you will face the paradox of choice.
Too many Considerations
Different stream processing systems have support for different features.
- Data Ingestion with a Message Broker
- Writing Queries with Streaming SQL
- Stream Processing APIs & Query Writing Environment
- Reliability, High Availability (HA), and Minimal HA
- Business user friendliness via drag and drop type Graphical User Interfaces (GUIs)
- Streaming Machine Learning
- Message processing guarantee
- Out-of-order events
- Large Scale System Performance (Scalability, Handle large windows)
This is a lot of features. We were struggling how to make a decision considering all these aspects across 19 stream processors.
Idea: “The Average Product”
After thinking about this for some time, we came up with two ideas.
First is there are must have vs. optional features. The must-have features are features most users will end up using in a streaming processing App and, therefore, required. Rest are optional features.
Then the second idea is “the average product”. We defined the average product as hypothetical product that has only features that are supported by more than 2/3 of stream processors in the market.
Then, you can evaluate each product in contrast to the average product. If the product has features additional to the average product it is a positive while if the product is missing features in contrast to the average product that is negative.
The average product give is a reference point to compare and contrast each product to the average.
After surveying 19 stream processors, we decided following four as must-have features.
- Support for Data Ingestion with a Message Broker
- Streaming SQL
- Stream Processing APIs & Query Writing Environment
- Reliability, High Availability (HA), and Minimal HA
We have discussed above four must-have features in detail in our InfoQ article, How to Choose a Stream Processor for Your App?
Choosing based on Advanced (Optional Features)
This article discusses the optional features and how we can use the idea of a general product to evaluate them.
As per our definition, the general Stream processing product has the following features. Or in other words, ⅔ of products or more has following features.
- The product can run in a distributed manner and Scalable
- Support at least once message delivery guarantees
- Provides fault tolerance to node failures
- Supports periodic checkpoints and recovery
- Supports back pressure
- Supports transformation, aggregation, correlation, time window operators
- Supports debugging
- Supports multiple data sources and sinks
- Supports writing custom operators
- Supports monitoring of running system
You can evaluate a specific stream processing by first selection the optional features that matter to your application and then comparing selected stream processor in terms of selection optional features in the average product.
Let’s discuss each optional features and what level of support is required by different use cases.
Notion of Time
A stream processing is always a distributed application. First, data is collected from sensors that run in one or more computers, then brought over to a stream processor, processed, and results are delivered to one or more other computers.
Consequently, there is no single notion of time or a clock across all nodes in the system ( this is well-known results in distributed systems). There are multiple notions of time, and users choose them based on their application.
The notion of time of a stream processor is of three types: event time, stream time, and processing time.
- Event time — the time when the event actually got generated
- Stream time —the time which stream processor receives data into the platform
- Processing time — the time stream processors process the data
The notion of time becomes very important for streaming applications which deal with time-order-sensitive operations.
Different stream processors support time on different levels. Some have the ability to handle event time and processing time. Some do handle only stream time. Some handle all three times. For example, if the alert generation application described above needs to use a time window based on the event time and do the comparison over a particular field during a certain time range (e.g., 5 pm to 8 am), a stream processor which uses only stream time cannot be used for such purpose. It is because if used stream time with windows, it may throw incorrect alerts. Hence you should choose a stream processor based on the level of time ordering support required by your application.
Stream processors which distinguish the event time and the stream time have to deal with the correctness issue of out-of-order events. This is because quite often out-of-ordering between the events happen when they are on transit to the stream processor’s receiver. Out-of-order events in a stream are produced due to multiple reasons such as operator parallelization, network latency, and the merging of asynchronous streams. An out-of-order handling technique is usually used to handle such situations. Details of out-of-order processing will be discussed in detail in a later subsection of this article.
Business User Friendliness
Support for business users to build stream processing applications improves the usability of a stream processor. A business user is someone who needs to use streaming analytics for a business use case but does not have good technical knowledge about the underlying technical aspects of the stream processing solution. Business user-friendliness allows both developers and businuss users to design, deploy, and customize their use cases.
An example where such business user-friendliness is needed is a business which is spread across different geographical locations. In such use case operation centers at each geographic location will be executing the same process with different variables. In such scenario designing the process in an abstract manner and getting the process customized for each geographic location significantly simplify the use case.
There are several example approaches where stream processors achieve business user-friendliness. One approach is via the use of spreadsheets for stream processing. Although the business users (i.e., domain experts) are not always programmers, they are typically familiar with spreadsheet processors since more than 500 million people worldwide are reported to use spreadsheets. Spreadsheets have been used for visualizing live streams, live programming to compute new streams, and exporting computations to be run on a server so that computations can be shared with other users and persisted beyond the life of the spreadsheet. The paper “Spreadsheets for Stream Processing with Unbounded Windows and Partitions” by Hirzel is an example of is approach although for best of our knowledge no stream processor supports this yet.
Another example is the use of templates in WSO2 Stream Processor (WSO2 SP). The Business Rules Manager of WSO2 SP allows for specifying queries as templates (i.e., skeletons). A developer who is knowledgeable on writing stream processing queries will decide which values should be kept as variables. A business user who has limited or no knowledge on how to write streaming queries may use the skeletons and create the applications. For example, let’s assume we want to write a query to detect when vehicles have exceeded the speed limit. However, business users may decide the speed limit based on where the query is deployed. Templates include a form business user may fill to change the behavior of the query. However, different parameters may change across organizations and business scenarios while preserving the same query structure. Use of a Business Rules Manager allows for quickly generating running queries using a query template and assigning values for the parameters.
As discussed earlier, drag and drop type Graphical User Interfaces (GUIs) allow for implementing applications by encapsulating query logic by using icons and property windows. For example, once the user drags and drops a stream component into the drag and drop GUI, he can use the stream component’s associated properties window to add the attribute names and their types.
Only 32% of the stream processors investigated by us provided at least some support for business users to develop stream processing applications. Providing pre-built business applications and business user tools also help business users to quickly transform their business processes with the stream processor system. Out of the stream processors surveyed only 3 provide such pre-built business applications.
Integration to Machine Learning
Machine learning (ML) learns from data and solve problems that are hard to solve by writing an algorithm. For example, a machine learning model may be used to detect fraudulent activities. A classifier could be incorporated with the alert generation application to generate alerts with better accuracy rather than using simple criteria such as greater than comparison.
Most new use cases will incorporate machine learning in some way. Hence, we need Stream Processors to support machine learning models. However, the integration of ML algorithms into stream processing applications is not a trivial task.
Traditional batch-based ML typically has two phases: model training and model-based prediction. The common approach is to first train the model using batch algorithms and then import the pre-trained ML model into the stream processing application. There are only a few stream processors which have built-in ML capabilities. In order to load a pre-trained ML model into a stream processing application, the model needs to be saved in a well-known format such as PMML so that the model can be loaded without any formatting issues.
Some stream processors also support streaming machine learning, which is technology that can build and improve the model as data comes in. However, the accuracy of streaming machine learning is lesser than models build with classical ( batch based) machine learning. At the same time, classical models do not improve with data. Hence, models need to be updated from time to time.
This is a trade-off. If your streaming application needs machine learning models, you need to select between batch and streaming machine learning by thinking about the trade-off of accuracy vs. concept drift ( how fast model drift off with new data).
Message Processing Guarantee
“What level of accuracy do you want your app to demonstrate?” is determined by the level of message processing guarantee offered by the stream processor. Message processing guarantee (i.e., semantics) determines how reliable is the message delivery of the stream processor. What type of semantics as a framework you want to use depends on what makes sense in your use case. There are three main message processing guarantees called at most once, at least once, and exactly once.
The weakest guarantee is at most once. At most once actually does not guarantee that it will deliver anything. For example, two events may go into the application from the incoming stream and only one message may come out from the application. When implementing at most once there is no state required at the sender or the receiver since there is no acknowledgment protocol involved. At most once does not introduce additional performance overhead and it is the easiest to implement among the three message processing guarantees.
A stronger guarantee is at least once where the message may get processed at least once. This means the message will definitely make it to the recipient, but there is the possibility of duplicates. The system failure will result in the same message repeated multiple times in the output stream. Hence the downstream applications need to handle such duplicate messages carefully. If the data is idempotent, the use of at least one message processing guarantee is sufficient enough for your use case. In order to implement at least once the sender has to tracks to whom the message is sent to, which messages haven’t been acknowledged yet, and when does the message has to be re-delivered. At least once message guarantee is good enough for many use cases. For example, for an alert generation application, at least once message guarantee is sufficient enough because it makes sure the notification of the alert gets delivered to the intended parties successfully.
The third type of message processing guarantee is exactly once processing which is sought by most of the stream processing community although it may or may not make sense. Exactly once guarantee is required for some mission-critical applications such as purchase orders, financial trading systems, flight reservation, hotel reservation, etc. If two messages go into the application and some system crash happens, exactly once guarantees that each message is processed exactly one time. When implementing the exactly once message guarantee we need to keep state on both the sender and the receiver. The receiver also has to keep track of which messages have already seen and processed before for at least for some time. Therefore, this approach is the most expensive to implement among the three message processing guarantees.
Many of the stream processing use cases only need at most once and the at least once guarantees. We observed about 47% out of the 19 stream processors supported at least once semantics, 37% supported the exactly once, and 21% had no message processing guarantee at all.
If your streaming application needs stronger guarantees, you should select the stream processor accordingly.
Out-of-order processing and handling uncertainty
What will happen if the input data comes in different time order than they get produced at the source? Out-of-order events (i.e., late arriving events) in a stream are produced due to multiple reasons such as distributed sensors, operator parallelization, network latency, and merging of asynchronous streams. Order-less event processing may result in wrong results. For example, handling out-of-order events is needed for ensuring the correct operation of event pattern matching. An alert can be configured to be generated if event X arrives after event Y. Let's assume a query detects events ordered as Y followed by X. If the query receives events X after Y due to out of order, it will not match. Similarly, an application with time window which gets triggered based on event time may also get affected by such out-of-order events. However, certain applications such as event filters are not affected by the out-of-order events.
Out-of-order event handling is a topic that is been actively explored. There are several high-level approaches.
First is to intercept the events and then reorder them in a buffer before processing them. This method added latency proportional to the amount each event can be delayed. This method works when the delay caused by out-of-order is small and bounded.
The second approach is to build resilient operators that can work with and recover from out-of-order events. This, however, makes the operators complex to implement. An example is the use of punctuations. Punctuation-based techniques depend on special tuples sent with data streams. Punctuations explicitly inform a query operator when to return results for windows. Hence unlike Buffer-based techniques described above, the query operator can consume out-of-order input directly.
The third approach is called speculation where we apply a compensation technique to correct previously emitted inaccurate query results when out-of-order events are observed. Here we assume in-order arrival of tuples and produce the results of a window immediately when the window is closed. When a late arrival e is detected, previously emitted results which are affected by e are invalidated. New revisions of these results are produced by taking e into account. This approach works best when the late arrivals are less. If the stream continuously receives out-of-order events this may introduce additional latency for corrections.
out-of-order event handling is not performed by the majority of stream processors although a significant amount of research has been conducted for various approaches for handling the disorder. Only 47% of stream processors out of the 19 surveyed do care about out-of-order events. Furthermore, only a few stream processors out of the 47% which actually did reordering. The others simply dropped the out-of-order events.
A typical stream processing application with event persistence runs around 50k events/second. Hence, most of the stream processing scenarios can be implemented with 2-node HA setup (e.g. see Is your Stream Processor Obese?). However, there can be a requirement of scaling the system beyond 2 nodes. Furthermore, windows with a large state may consume a large amount of system memory. Stream processor needs to have support for handling such a large state.
What will happen if your app has to handle increasing amounts load? System scalability and performance are important metrics which measure the ability of a stream processor to handle large workloads. The expected level of performance depends on the use case and once chosen, switching to a different stream processor is a very costly operation.
For example, the use case may initially be simple filtering operations on small events (with few fields totaling to a few hundred bytes). But with time the events may get bigger with adding new fields with significantly large amounts of data (e.g., couple of megabytes). Furthermore, with time the business keeps growing and the application logic may also get beefed-up with complex joins, event pattern matching, window processing, etc. With such changes, the use case may demand several times more performance than what could be delivered by the initial system. Hence if the use case needs a lot of performance, you need to test whether the candidate SPs can handle such a heavy workload.
A scalable stream processor can expand its operational scale by adding more resources. If a stream processor is guaranteed to receive an only fixed rate of input events from a fixed set of input sources (e.g., stream processor for in-vehicular networks) the stream processor need not have scalability features. However, if the stream processor is run for example as a software service where it needs to handle changing workloads, then the ability to scale is often required. Most of the stream processors discuss their ability of horizontal scaling (adding new nodes) without much details on vertical scaling (adding resources keeping the same node). It is a very important factor to consider the vertical scalability of the stream processor especially if the deployment environment cannot be provisioned with more servers. Furthermore, horizontal scaling introduces additional network communication overhead. This means we should first try to scale vertically as much as possible before going for horizontal scaling.
The open source distributed stream processors have published throughput numbers in the ranges of millions of events per second. Most of the proprietary stream processors do not have published throughput numbers. Given a fixed steady input data flow, the throughput of a stream processor is subjective to multiple factors such as queue size, CPU core, memory, and inter-node bandwidth. Hence it is virtually impossible to find studies conducted across all the list of products. Out of the performance numbers which could be found on distributed stream processors, the largest throughput found was 110 million events/s with low latency of 30ms. The best latency for single node versions found was 0.003ms.
Handling long aggregation windows
What will happen if the app involves windows with millions of events? Stream processing, in general, applies lightweight one-time processing on a stream of data. The stream applications often have to conduct aggregations on top of windows. The size of the window matters a lot since the window has to be kept in memory. If the window may consist of millions of events, it must be kept in memory and the system may get overloaded it becomes a great threat to the stability of the system. For example, in telecommunication systems, there can be use cases of working with long distance call records consisting of 300 million records per day for 100 million customers. Another application would be network traffic engineering where the information on current network performance (e.g., latency, and Bandwidth) is aggregated online and is used to monitor and adjust the performance of the network dynamically.
Solutions can be provided from the stream processor side as well as application side. In order to handle large windows, the stream processor has to be provisioned with ample memory. If not the stream processor has to be equipped with the capabilities of reducing the memory footprint by compressing the window or storing part of the window in secondary storage.
The solution from the application point of view would be to conduct aggregation using smaller quantities and then calculate higher level values based on the smaller quantities (i.e., incremental aggregation). For example, if the end results expected by maintaining a day length window of 300 million records is to calculate the total duration of phone calls, the same result can be obtained by aggregating the call records minute wise, use those results to calculate hourly sum, and finally use those hourly values to calculate the total duration for the entire day.
Different stream processors have inherent features which match them for different use cases. While trying to select the stream processor best for you, the number of features to consider are large, which makes it difficult to make the right choice.
This article presents a systematic approach to choosing a stream processor.
The article does it by introducing two concepts.
- Must have features — these are features most Streaming applications will need within their lifecycle
- General products — This conceptual product consists of the features ⅔ or more stream processing products are supporting. One can decide to compare two stream processors by comparing both to the general product.
The approach answers two main questions. First, to what extent does the stream processor supports the core stream processor architecture features? Second, what are the special requirements of the application and to what extent those are being satisfied by the candidate stream processors?
When answering the second question, we can use the idea of the “general product” as the baseline. You can evaluate a specific stream processing by comparing it to the average product.