By: David Anderson, Principal Software Practice Lead at Confluent Flink SQL is a powerful engine designed for processing real-time, streaming data using the familiar SQL language and its underlying relational concepts. This has long been an effective solution for many application domains, especially ETL pipelines and analytical workloads, but until now Flink SQL has lacked key features often required for event-driven applications. With the recent developments described here, Apache Flink’s relational APIs – namely, Flink SQL and the Table API – have matured to the point where taking advantage of their powerful abstractions and built-in operators no longer requires compromising on having access to Flink’s low-level stream processing primitives. Everything being said here applies equally to both Flink SQL and the Table API. Both sit on top of the same runtime – the concepts and fundamental capabilities are the same – they are just accessed from Java or Python, rather than SQL, if you choose to use the Table API. Use cases These relational APIs are especially well-suited for two broad categories of applications. Agentic AI Agentic AI Modern AI applications often require access to dynamic, real-time information. Flink SQL facilitates agentic AI by making it easy to combine real-time event streams with insights from agents, models, and services. This means that as streaming data flows through Flink, it can be enriched or validated by querying these services in real-time, providing fresh and accurate context for AI decision-making or model inference. Shift left Shift left In the world of data processing, "shifting left" refers to performing data transformations and enrichments as early as possible in the data flow. Flink’s relational APIs excel here by enabling real-time transformations, and enrichment with other datasets. This allows for immediate insights and the preparation of data for subsequent analysis or consumption. Familiarity combined with novelty Traditionally, SQL has been used for both operational and analytical use cases. Flink SQL takes the well thought-through concepts and semantics developed for Online Transaction Processing (OLTP) and Online Analytical Processing (OLAP), and extends them to satisfy the demands of real-time stream processing applications. The relational algebra remains the same The relational algebra remains the same The fundamental principles of relational algebra, which form the bedrock of SQL, are fully preserved in Flink SQL. Tables: While traditional SQL tables are static, Flink SQL introduces the concept of dynamic tables, which are logical representations of continuous, unbounded streams of data. Tables: Aggregations: Aggregations (e.g., `COUNT`, `SUM`, `AVG`) are applied to streams, providing real-time summaries of data. Aggregations: Joins: Flink SQL supports various join operations, allowing for the combination of data from different streaming sources. These can range from traditional inner and outer joins to more specialized temporal joins. For more on this, see the section on joins below. Joins: Continuous queries (materialized views): Each Flink SQL statement continuously computes the result of a query on streams, enabling immediate access to frequently requested data. This is fundamentally the same operation that traditional databases are doing when they perform incremental materialized view maintenance. Continuous queries (materialized views): New challenges and ideas for stream processing New challenges and ideas for stream processing On the other hand, applying these relational concepts to the dynamic world of stream processing introduces some new considerations: Thinking about state: Stream processing operations often need to maintain "state" – information about past events – to perform their computations. For example, an aggregation needs to keep track of its partially accumulated result, and a join might need to buffer records from one stream while waiting for matching records from another. Developers using Flink SQL (or the Table API) must carefully consider state management, as excessive state can impact performance and resource consumption. Thinking about state: Watermarks: In stream processing, events can arrive out of order or with delays. Watermarks are a crucial concept in Flink SQL that helps to deal with these irregularities by providing a notion of "completeness" for streams. Watermarks define a threshold after which Flink assumes no more events with an earlier timestamp will arrive, enabling consistent windowed aggregations and timely results. Watermarks: Flink SQL includes a number of special temporal versions of stateful operations, such as time-windowed aggregations, and windowed joins, that rely on watermarks to know when their results are ready to be emitted (and when the state they were keeping can be freed). Recent developments Recent developments are making these relational APIs more compelling than ever. Flink SQL and the Table API are highly extensible through the use of User-Defined Functions (UDFs). These allow Java or Python to be used to implement custom logic that cannot be expressed directly in SQL. UDFs can perform complex calculations, exploit 3rd party libraries, integrate with external systems, or implement domain-specific transformations. Process Table Functions (PTFs) are a new flavor of UDF that has just been introduced as part of Flink 2.1. Unlike traditional UDFs, PTFs can process entire tables (streams) in a sophisticated manner, using Flink’s managed state and timer services. This allows for user-defined operators, such as custom window functions or joins, that are as feature-rich as the built-in operators. Process Table Functions (PTFs) Streaming Joins in Flink SQL Join operations illustrate the similarities and crucial differences in Flink’s relational APIs compared to traditional SQL databases. This section uses some practical examples to explore how the concepts presented above can be applied to real applications that use joins. Consider, for example, an online business processing orders and payments, where each payment is related by a foreign key reference (the orderId) to one order. Orders and payments are Apache Kafka topics, and we want to create a new Kafka topic for paid orders that will be used for order fulfillment. A simple solution that seems like it should work is CREATE TABLE PaidOrders AS ( SELECT o.id AS orderId, p.id AS paymentId FROM Orders o INNER JOIN Payments p ON o.id = p.orderId ); CREATE TABLE PaidOrders AS ( SELECT o.id AS orderId, p.id AS paymentId FROM Orders o INNER JOIN Payments p ON o.id = p.orderId ); Of course, in a real application we would pull a bunch more information into the output, such as the customer ID, their shipping address, and the IDs of the products being ordered. But let’s keep this example simple, and focus on the big picture. In the parlance of the Flink community, a join like this one is called a “regular join” (meaning we haven’t done anything special to allow Flink to execute it more efficiently). And when executed by Flink SQL’s streaming runtime, regular joins can be surprisingly expensive. Hypothetically, any number of future payments might match an older order, requiring the runtime to store all of the orders indefinitely. Of course, in reality, we know this won’t be the case. In fact, based on our knowledge of how this company runs its business, we might know that each order will have at most one matching payment, and so there’s no need for Flink to store an order past the point where its payment has been processed. One thing we could do to make this join less expensive would be to assume that payments will always happen within two hours of the order (for example): CREATE TABLE PaidOrders AS ( SELECT o.id AS orderId, p.id AS paymentId FROM Orders o INNER JOIN Payments p ON o.id = p.orderId WHERE p.paymentTime BETWEEN o.orderTime AND o.orderTime + INTERVAL '2' HOUR ); CREATE TABLE PaidOrders AS ( SELECT o.id AS orderId, p.id AS paymentId FROM Orders o INNER JOIN Payments p ON o.id = p.orderId WHERE p.paymentTime BETWEEN o.orderTime AND o.orderTime + INTERVAL '2' HOUR ); This will limit how long the orders are stored in Flink’s runtime state, but it’s not a very satisfying approach. On the one hand, we’d rather free an order’s state as soon as it is matched to a payment, and on the other hand, some payments might be delayed for more than 2 hours. In other words, this is a way to get an answer we can’t trust, and spend more than necessary to get there. Unfortunately, the one-to-one message enrichment that we’d ideally like to do in this case cannot be directly expressed using standard SQL concepts. This is a good example of where a custom PTF makes sense. Here’s a simplified example of what this might look like: // Function that buffers one object from each side // of the join to produce exactly one result. public static class OrderPaymentJoin extends ProcessTableFunction<JoinResult> { public void eval( Context ctx, @StateHint(name = "result") JoinResult result, @ArgumentHint(SET_SEMANTIC_TABLE) Order order, @ArgumentHint(SET_SEMANTIC_TABLE) Payment payment ) { if (order != null) { if (result.orderId != null) { // Skip duplicates. return; } else { // Save the order and wait // for the matching payment. result.orderId = order.id; } } if (payment != null) { if (result.paymentId != null) { // Skip duplicates. return; } else { // The order will precede the payment, but // we cannot guarantee the order will be // processed first. Save the payment // and wait for the matching order. result.paymentId = payment.id; } } if (result.orderId != null && result.paymentId != null) { // Send out the final join result // and clear the state. collect(result); ctx.clearState("result"); } } } // Function that buffers one object from each side // of the join to produce exactly one result. public static class OrderPaymentJoin extends ProcessTableFunction<JoinResult> { public void eval( Context ctx, @StateHint(name = "result") JoinResult result, @ArgumentHint(SET_SEMANTIC_TABLE) Order order, @ArgumentHint(SET_SEMANTIC_TABLE) Payment payment ) { if (order != null) { if (result.orderId != null) { // Skip duplicates. return; } else { // Save the order and wait // for the matching payment. result.orderId = order.id; } } if (payment != null) { if (result.paymentId != null) { // Skip duplicates. return; } else { // The order will precede the payment, but // we cannot guarantee the order will be // processed first. Save the payment // and wait for the matching order. result.paymentId = payment.id; } } if (result.orderId != null && result.paymentId != null) { // Send out the final join result // and clear the state. collect(result); ctx.clearState("result"); } } } With this PTF in place, our join query becomes CREATE TABLE PaidOrders AS ( SELECT orderId, paymentId FROM OrderPaymentJoin( order => TABLE(Orders) PARTITION BY order_id, payment => TABLE(Payments) PARTITION BY order_id ) ); CREATE TABLE PaidOrders AS ( SELECT orderId, paymentId FROM OrderPaymentJoin( order => TABLE(Orders) PARTITION BY order_id, payment => TABLE(Payments) PARTITION BY order_id ) ); In practice, this simplified example might be extended to do something about the possibility of accumulating an ever-increasing number of unmatched orders (orders that never receive payments). Other exciting developments Native support for ML models Native support for ML models FLIP-437 (included in Flink 2.0) added support for machine learning models as first-class citizens in Flink SQL. Disaggregated state backend Disaggregated state backend The Flink community is developing a more cloud-native approach to state management, which will ultimately make it more practical for applications to use enormous amounts of state. This will never eliminate the need to think about the state requirements of your streaming applications, but it will significantly change how you should view the tradeoffs involved. For users of Flink’s SQL and Table APIs, experimenting with this is simply a matter of changing your runtime configuration. See the documentation for more details. documentation Semi-structured types Semi-structured types Semi-structured data offers greater flexibility, like allowing for fields to be added over time, optional fields, or fields whose type may vary from row to row. Currently, users of Flink’s relational APIs must choose between ROWs with strict schemas and static types, or storing data as JSON strings. However, working with JSON has significant performance limitations. FLIP-521 is adding a VARIANT type to support semi-structured data that can be efficiently stored and processed. FLIP-521 For a deeper understanding To learn more about Flink SQL, this video on continuous queries will get you started. this video on continuous queries The documentation on Process Table Functions has several excellent examples, if you want to dive more deeply into this topic. documentation on Process Table Functions Sean Falconer makes the case for why The Future of AI Agents is Event-Driven. The Future of AI Agents is Event-Driven The Future of AI Agents is Event-Driven