This article is part of Alibaba’s Flink series.
As an open-source framework for big data computing, Apache Flink has undergone extensive optimization to meet a range of users’ demands for enhancement. For Alibaba Group, where the framework is deployed in a large-scale production environment, the need for these changes has motivated its real-time computing team to contribute many of Flink’s most valuable optimizations, benefiting the Flink community and Alibaba alike.
In this article, we look at Alibaba’s contributions in two key aspects of the framework’s architecture, tracing developments including Flink’s Query Processor, BinaryRow, and AsyncOperator components.
To enable users to develop code sets based on their own business logic that can be run by various different scenarios simultaneously, Flink first needed to be able to provide those users with a unified API. Alibaba Group’s real-time computing team determined that SQL made the best choice toward this end as a recognized classic that has benefited from decades of testing in the field of batch processing.
In stream computing, the concept of dynamic tables has recently emerged as a way of describing stream computing that uses SQL as batch processing would, based on equivalent logic. In this way, users can use SQL to describe their own business logic, and a given query statement can either be a batch task or a high-throughput low-latency stream computing task when executed. Moreover, processing for such statements can be carried out such that historical data is first computed using batch processing technology before its processing is automatically converted to a stream computing task to deal with the latest real-time data.
Under the SQL declarative API, the engine has a greater number of choices and greater room for optimization. The following sections look at several of the most important such optimizations in detail.
Alibaba’s first key optimizations to Flink upgraded and replaced its SQL layer architecture.
Developers familiar with Flink through research or actual use will likely be aware that Flink has two basic APIs: one for DataStream (provided for streaming users), and one for DataSet (provided for batch users). These two sets of APIs have completely different execution paths, as well as needing to generate different tasks to execute. After a given series of optimizations, Flink’s native SQL layer will call the DataSet API or the DataStream API, depending on the user’s requirements for stream or batch processing. This often results in the user facing two sets of predominantly independent technology stacks in daily development and optimization, meaning that many things need to be repeated twice. This will also result in optimization on one side of the technology stack that is not available on the other side.
To counter these issues, Alibaba proposed a new Query Processor at the SQL layer, consisting primarily of an optimization layer (QueryOptimizer) that can be reused by streams and batches as much as possible and a Query Executor based on the same StreamOperator interface. Using this model, more than 80 percent of work including common optimization rules and underlying data structures can be reused on both technology stacks. Simultaneously, streams and batches still retain their own specific optimizations and operators to satisfy different jobs.
Having unified the SQL layer’s technical architecture, Alibaba began to seek a more efficient underlying data structure that would make Blink more efficient in the SQL layer.
In the native Flink SQL, a data structure called Row is used uniformly. It consists of some Java objects and represents a row in a relational database. For example, if the current row of data consists of an integer, a float, and a string, then Row will contain a Java integer, a Java double and a Java string. As developers know well, these Java objects have a lot of overhead in heap, and also introduce unnecessary boxing and unboxing operations when accessing the data.
To counter these problems, Alibaba proposed a new data structure called BinaryRow, which like the original Row represents a row in a relational database but instead stores the data in binary. Continuing with the previous example, these three different type fields are uniformly represented by Java’s byte[], bringing numerous benefits.
First, a great deal of unnecessary consumption is removed from the storage space, making storage of the object more compact. Second, many unnecessary serialization and deserialization overheads can be avoided when dealing with network or state storage. Finally, the entire execution code is made friendlier to the GC after removal of all unnecessary boxing and unboxing operations.
With this new, efficient underlying data structure, the SQL layer’s overall execution efficiency more than doubles.
At the operator’s implementation level, Alibaba has introduced a wider ranger of code generation techniques to Flink. The unification of Flink’s technical architecture and underlying data structure achieves a wider range of reuse for many code generation techniques. Thanks to SQL’s strong type guarantee, Flink can meanwhile recognize the type of data the operator will need to process in advance and thus generate more specific and efficient execution codes.
In the native Flink SQL, only simple expressions like a>2 or c+d can apply code generation techniques. Due to Alibaba’s optimization, some operators are able to perform overall code generation such as for sorting and aggregation, which allows the user to more flexibly control the operator’s logic or to directly embed the final running code into the class, and thus eliminate expensive overhead for function calls. Further, some underlying data structures and algorithms that apply code generation techniques like sorting algorithms, binary data-based HashMap, and so on can also be shared and reused between the operators of streams and batches, allowing the user to truly enjoy the benefits of unification between technology and architecture. By optimizing the data structures and algorithms for certain batch processing scenarios, it also becomes possible to improve stream computing performance.
The optimizations discussed so far have all focused on changes to Flink’s SQL layer. The following sections look at important improvements Alibaba has contributed to Flink’s Runtime layer.
Alibaba’s real-time computing team predictably discovered a number of challenges in enabling Flink to take root in Alibaba’s large-scale production environment.
Ahead of other issues, integrating Flink with other cluster management systems presented a series of consecutive problems, due to the fact that Flink’s native cluster management model was not yet complete and could not use other more mature cluster management systems natively. Specifically, Alibaba needed to answer questions of how to coordinate resources among multi-tenants, how to dynamically apply for and release resources, and how to specify different resource types.
To solve these challenges, Alibaba’s real-time computing team performed extensive research and analysis before ultimately choosing to transform Flink’s resource dispatch system so that Flink can run on the Yarn cluster natively. Further, the team refactored the Master architecture to make a given Job correspond to a specific Master, such that the Master no longer generates a cluster bottleneck. Based on this vantage, Alibaba and the Flink community then jointly launched a new Flip-6 architecture that turns Flink resource management into a pluggable architecture, thus laying a sound foundation for Flink’s sustainable development. As a result, Flink can now seamlessly run on YARN, Mesos, and K8s, offering a powerful demonstration of the value of this architecture.
After solving the above challenges, Alibaba looked next to improve Flink’s reliability and stability, focusing on its FailOver mechanism to ensure high availability in the production environment.
The first step in doing so was to improve Flink’s Master FailOver. Flink’s native Master FailOver restarts all jobs, while Alibaba’s improvements ensured that Master FailOver would not impact normal job running in any way. Next, Alibaba introduced a Region-based Task FailOver to minimize user impact in the event of Task FailOver. With these improvements, a large number of Alibaba’s service clients have begun to migrate real-time computing to Flink.
Flink’s capacity for stateful streaming is one of its greatest advantages, as its Chandy-Lamport algorithm-based Checkpoint mechanism enables its exactly-once consistency and computing power. However, early Flink versions suffered in Checkpoint performance from a particular bottleneck under large-scale volumes of data, which Alibaba sought to resolve through optimization to Checkpoint.
First, Alibaba developed an incremental Checkpoint mechanism to deal with the dozens of terabytes of State data generated by big jobs in Alibaba’s production environment, given that implementing a full checkpoint would lead to surging costs. With the incremental Checkpoint mechanism, the cost of checkpoint is reduced to a constant but small flow.
Secondly, Alibaba developed a model for merging small files in Checkpoint, due to problems caused by a rising number of checkpoint files. As the number of Flink jobs grows in the entire cluster, the number of checkpoint files eventually overwhelms the HDFS NameNode. Alibaba’s approach reduces pressure on the NameNode by a factor of several dozen through organized merging of numerous smaller checkpoint files into a single large file.
Although all data can be stored in the State, for historical reasons the user still needs to store some amount of data in an external KV storage location such as HBase. The user will then need to access this external data in the Flink Job, but since Flink has always been a single-threaded processing model the latency of accessing external data becomes a bottleneck for the entire system. While asynchronous access offers a direct means of solving this problem, it is nevertheless difficult for users to write multi-thread in UDF while ensuring exactly-once semantics.
To solve this challenge, Alibaba proposed the AsyncOperator in Flink, which makes writing asynchronous calls in the Flink Job remarkably easy and offers a huge gain to Flink Job’s throughput.
Flink’s key distinction is its design as a computing engine that unifies batches and streams, for which reason it is gaining interest among batch computing users who have experienced its lightning-fast stream computing capabilities. However, batch computing brings new challenges to the framework, motivating Alibaba’s extensive work to resolve these issues through original component contributions.
In terms of task dispatch, Alibaba has provided a more flexible dispatch mechanism that can perform more efficient dispatches based on the dependencies between tasks. Beyond this, Flink’s native Shuffle Service is not binding with the TM, such that after a task is executed the original TM is retained and resources cannot be released, while the original Batch shuffle does not merge files (meaning it cannot be used in production). Alibaba has solved these problems while developing the Yarn Shuffle Service feature, discovering in the process that developing a new Shuffle Service involved inconvenient invasion of many locations in Flink’s code. To enable other developers to easily extend different Shuffles, Alibaba modified the Flink Shuffle architecture to make it pluggable. As an example of how it can be deployed, Alibaba’s search service currently uses Flink Batch Job to serve production.
The optimizations described above have covered three years of intensive polishing by Alibaba, following which Blink has begun to work effectively for Alibaba’s unique needs as a large-scale production environment. Optimization and improvement of Runtime is endless, however, and Alibaba continues to work on a number of important improvements and optimizations bound for future versions of Flink.
This article is part of Alibaba’s Flink series.
First hand and in-depth information about Alibaba’s latest technology → Facebook: “Alibaba Tech”. Twitter: “AlibabaTech”.