In this blog post, we will be covering how you can combine and leverage the open-source streaming solution, , with , to improve the quality of your streaming flows. Buckle up! bytewax ydata-profiling Stream processing enables real-time analysis of data in-flight and before storage and can be or . stateful stateless is used for recommendations, pattern detection, or complex event processing, where the history of what has happened is required for the processing (windows, joining by a key, etc.). Stateful stream processing real-time is used for transformation that doesn’t require knowledge of other data points in the stream like masking an email or converting a type. Stateless stream processing inline Overall, data streams are widely used in the industry and can be found applied to use cases such as , , or . fraud detection patient monitoring event predictive maintenance One Crucial Aspect That All Data Streams Must Consider Is the Quality of the Data Unlike traditional models where data quality is usually assessed during the creation of the data warehouse or dashboard solution, . streaming data requires continuous monitoring It is essential to maintain data quality throughout the entire process, from collection to feeding downstream applications. After all, the cost of bad data quality can be high for organizations: “The cost of bad data is an astonishing 15% to 25% of revenue for most companies. (…) Two-thirds of these costs can be eliminated by getting in front on data quality.” — Thomas C. Redman, author of “Getting in Front on Data Quality” Throughout this article, we will show you how you can combine with to profile and improve the quality of your streaming flows! bytewa ydata-profiling Stream Processing for Data Professionals With Bytewax is an OSS stream processing framework designed specifically for Python developers. Bytewax It allows users to with capabilities similar to Flink, Spark, and Kafka Streams while providing a friendly and familiar interface and build streaming data pipelines and real-time applications 100% compatibility with the Python ecosystem. Using built-in or existing Python libraries, (Kafka, RedPanda, WebSocket, etc.) and out to various downstream systems (Kafka, parquet files, data lakes, etc.). connectors you can connect to real-time and streaming data sources write transformed data For the transformations, Bytewax with , , and methods and comes with familiar features such as recovery and scalability. facilitates stateful and stateless transformations map windowing aggregation Bytewax and is p . facilitates a Python-first and data-centric experience to data streams urposely built for data engineers and data scientists It allows users to and create customizations necessary to meet their needs without having to learn and maintain JVM-based streaming platforms like Spark or Flink. build streaming data pipelines and real-time applications Bytewax is well suited for many use cases, namely, , , , and more. Embedding Pipelines For Generative AI Handling Missing Values in Data Streams Using Language Models in a Streaming Context to Understand Financial Markets For use case inspiration and more information like documentation, tutorials, and guides, feel free to check . the bytewax website Why Data Profiling for Data Streams? and refers to the step of : its structure, behavior, and quality. Data Profiling is key to a successful start of any machine learning task thoroughly understanding our data In a nutshell, involves analyzing aspects related to the data’s format and basic descriptors (e.g., number of samples, number/types of features, duplicate values), its (such as the presence of missing data or imbalanced features), and other complicating factors that may arise during data collection or processing (e.g., erroneous values or inconsistent features). data profiling intrinsic characteristics , where circumstances might change fast and may require immediate action (e.g., healthcare monitoring, stock values, air quality policies). Ensuring high data quality standards is crucial for all domains and organizations, but is especially relevant for domains operating with domains outputting continuous data For many domains, data profiling is used from an exploratory data analysis lens, considering historical data stored in databases. On the contrary, for data streams, , where data needs to be checked at different time frames or stages of the process. data profiling becomes essential for validation and quality control continuously along the stream By embedding an , we can on the current state of our data and be alerted for any potentially critical issues — whether they are related to (e.g., corrupted values or changing formats), or to (e.g., data drifts, deviation from business rules and outcomes). automated profiling into our data flows immediately get feedback data consistency and integrity events happening in short periods of time In real-world domains — — automated profiling might save us from multiple brain puzzles and systems needing to be taken out of production! where you just know Murphy’s law is bound to strike and “everything can definitely go wrong” In what concerns data profiling, has consistently been a , either for or data. And no wonder why — ydata-profiling crowd favorite tabular time-series it’s one line of code for an extensive set of analysis and insights. Complex and time-draining operations are done under the hood: ydata-profiling and depending on the feature types (either numeric or categorical), it that are shown in the profiling report. automatically detects the feature types comprised in the data, adjusts the summary statistics and visualizations Fostering a , the package also , focusing on their pairwise and , and provides a thorough , from or values to and features. data-centric analysis highlights the existing relationships between features interactions correlations evaluation of data quality alerts duplicate constant skewed imbalanced It’s really a — with minimal effort. 360º view of the quality of our data Putting It All Together: bytewax and ydata-profiling Before starting the project, we need to first set our Python dependencies and configure our data source. First, let’s install the and packages ( bytewax ydata-profiling You might want to use a virtual environment for this — if you need some extra guidance!) check these instructions pip install bytewax==0.16.2 ydata-profiling==4.3.1 Then, we’ll upload the (License — CC0: Public Domain), which contains several measurements of from different IoT devices: Environmental Sensor Telemetry Dataset temperature, humidity, carbon monoxide liquid petroleum gas, smoke, light, and motion wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000 , and the input would look like what we expect in a streaming platform . In this article, and create a dataflow using bytewax. In a production environment, these measurements would be continuously generated by each device such as Kafka to simulate the context we would find with streaming data, we will read the data from the CSV file one line at a time (As a quick side note, a dataflow is essentially a data pipeline that can be described as a directed acyclic graph — DAG) First, let’s make some : necessary imports from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main Then, we define our dataflow object. Afterward, we will use a stateless map method where we pass in a function to convert the string to a DateTime object and restructure the data to the format (device_id, data). The map method will make the change to each data point in a stateless way. The reason we have modified the shape of our data is so that we can easily group the data in the next steps to profile data for each device separately rather than for all of the devices simultaneously. flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data)) Now, we will take advantage of the stateful capabilities of to gather data for each device over the duration of time that we have defined. expects a snapshot of the data over time, which makes the window operator the perfect method to use to do this. bytewax ydata-profiling In , we are able to produce summarizing statistics for a dataframe that is specified for a particular context. For instance, in our example, we can produce snapshots of data referring to each IoT device or to particular time frames: ydata-profiling from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print) After the snapshots are defined, leveraging is as simple as calling the for each of the dataframes we would like to analyze: ydata-profiling ProfileReport import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile) In this example, we are writing the images out to local files as part of a function in a map method. These could be reported out via a messaging tool or we could save them to some remote storage in the future. Once the profile is complete, the dataflow expects some output so we can use the built-in to print the device that was profiled and the time it was profiled at that was passed out of the profile function in the map step: StdOutput flow.output("out", StdOutput()) There are multiple ways to execute Bytewax dataflows. In this example, we use the same local machine, but Bytewax can also run on multiple Python processes, across multiple hosts, in a , using a , and . Docker container Kubernetes cluster more In this article, we’ll continue with a local setup, but we encourage you to check our helper tool which manages Kubernetes dataflow deployments once your pipeline is ready to transition to production. waxctl Assuming we are in the same directory as the file with the dataflow definition, we can run it using: python -m bytewax.run ydata-profiling-streaming:flow We can then use the profiling reports to validate the data quality, check for changes in schemas or data formats, and . compare the data characteristics between different devices or time windows In fact, we can leverage the that highlights the differences between two data profiles in a straightforward manner, making it easier for us to detect important patterns that need to be investigated or issues that have to be addressed: comparison report functionality snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html") Ready to Explore Your Own Data Streams? Validating data streams is crucial to identify issues in data quality in a continuous manner and compare the state of data across distinct periods of time. For organizations in , , , and — all working with continuous streams of data — a , from quality assessment to data privacy. healthcare energy manufacturing entertainment utomated profiling is key to establishing data governance best practices This requires the analysis of snapshots of data which, as showcased in this article, can be achieved in a seamless way by combining and . bytewax ydata-profiling takes care of all the processes necessary to handle and structure data streams into snapshots, which can then be summarized and compared with through a comprehensive report of data characteristics. Bytewax ydata-profiling Being able to appropriately process and profile incoming data opens up a plethora of use cases across different domains, from the to the highlighting and mitigation of additional issues that derive from real-world activities, such as (e.g., fraud or intrusion/threats detection), , and other events that deviate from the expectations (e.g., data drifts or misalignment with business rules). correction of errors in data schemas and formats anomaly detection equipment malfunction Now, you’re all set to start exploring your data streams! Let us know what other use cases you find, and as always, feel free to drop us a line in the comments, or find us at the for further questions and suggestions! Data-Centric AI Community See you there! Acknowledgments ) and Oli Makhasoeva (Developer Relations @ ) -- developing . This article was written by Fabiana Clemente (Co-founder & CDO @ ) and Miriam Santos (Developer Relations @ ) -- developing -- and Zander Matheson (CEO & Founder @ YData YData ydata-profiling Bytewax Byetwax bytewax You may find additional information about the OSS packages in the respective documentations: & . ydata-profiling docs bytewax docs Also published here