In this blog post, we will be covering how you can combine and leverage the open-source streaming solution, bytewax, with ydata-profiling, to improve the quality of your streaming flows. Buckle up!
Stream processing enables real-time analysis of data in-flight and before storage and can be stateful or stateless.
Stateful stream processing is used for real-time recommendations, pattern detection, or complex event processing, where the history of what has happened is required for the processing (windows, joining by a key, etc.).
Stateless stream processing is used for inline transformation that doesn’t require knowledge of other data points in the stream like masking an email or converting a type.
Overall, data streams are widely used in the industry and can be found applied to use cases such as fraud detection, patient monitoring, or event predictive maintenance.
Unlike traditional models where data quality is usually assessed during the creation of the data warehouse or dashboard solution, streaming data requires continuous monitoring.
It is essential to maintain data quality throughout the entire process, from collection to feeding downstream applications. After all, the cost of bad data quality can be high for organizations:
“The cost of bad data is an astonishing 15% to 25% of revenue for most companies. (…) Two-thirds of these costs can be eliminated by getting in front on data quality.”
— Thomas C. Redman, author of “Getting in Front on Data Quality”
Throughout this article, we will show you how you can combine bytewa
with ydata-profiling
to profile and improve the quality of your streaming flows!
It allows users to build streaming data pipelines and real-time applications with capabilities similar to Flink, Spark, and Kafka Streams while providing a friendly and familiar interface and 100% compatibility with the Python ecosystem.
Using built-in
For the transformations, Bytewax facilitates stateful and stateless transformations with map, windowing, and aggregation methods and comes with familiar features such as recovery and scalability.
Bytewax
It allows users to build streaming data pipelines and real-time applications and create customizations necessary to meet their needs without having to learn and maintain JVM-based streaming platforms like Spark or Flink.
Bytewax is well suited for many use cases, namely,
For use case inspiration and more information like documentation, tutorials, and guides, feel free to check
Data Profiling is key to a successful start of any machine learning task and refers to the step of
In a nutshell,
Ensuring high data quality standards is crucial for all domains and organizations, but is especially relevant for domains operating with domains outputting continuous data, where circumstances might change fast and may require immediate action (e.g., healthcare monitoring, stock values, air quality policies).
For many domains, data profiling is used from an exploratory data analysis lens, considering historical data stored in databases. On the contrary, for data streams, data profiling becomes essential for validation and quality control continuously along the stream, where data needs to be checked at different time frames or stages of the process.
By embedding an automated profiling into our data flows, we can immediately get feedback on the current state of our data and be alerted for any potentially critical issues — whether they are related to data consistency and integrity (e.g., corrupted values or changing formats), or to events happening in short periods of time (e.g., data drifts, deviation from business rules and outcomes).
In real-world domains — where you just know Murphy’s law is bound to strike and “everything can definitely go wrong” — automated profiling might save us from multiple brain puzzles and systems needing to be taken out of production!
In what concerns data profiling, ydata-profiling
has consistently been a
Complex and time-draining operations are done under the hood: ydata-profiling automatically detects the feature types comprised in the data, and depending on the feature types (either numeric or categorical), it adjusts the summary statistics and visualizations that are shown in the profiling report.
Fostering a data-centric analysis, the package also highlights the existing relationships between features, focusing on their pairwise interactions and correlations, and provides a thorough evaluation of data quality alerts, from duplicate or constant values to skewed and imbalanced features.
It’s really a 360º view of the quality of our data — with minimal effort.
Before starting the project, we need to first set our Python dependencies and configure our data source.
First, let’s install the bytewax
and ydata-profiling
packages (You might want to use a virtual environment for this —
pip install bytewax==0.16.2 ydata-profiling==4.3.1
Then, we’ll upload the
wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000
In a production environment, these measurements would be continuously generated by each device, and the input would look like what we expect in a streaming platform
(As a quick side note, a dataflow is essentially a data pipeline that can be described as a directed acyclic graph — DAG)
First, let’s make some necessary imports:
from datetime import datetime, timedelta, timezone
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput
from bytewax.testing import run_main
Then, we define our dataflow object. Afterward, we will use a stateless map method where we pass in a function to convert the string to a DateTime object and restructure the data to the format (device_id, data).
The map method will make the change to each data point in a stateless way. The reason we have modified the shape of our data is so that we can easily group the data in the next steps to profile data for each device separately rather than for all of the devices simultaneously.
flow = Dataflow()
flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000"))
# parse timestamp
def parse_time(reading_data):
reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc)
return reading_data
flow.map(parse_time)
# remap format to tuple (device_id, reading_data)
flow.map(lambda reading_data: (reading_data['device'], reading_data))
Now, we will take advantage of the stateful capabilities of bytewax
to gather data for each device over the duration of time that we have defined. ydata-profiling
expects a snapshot of the data over time, which makes the window operator the perfect method to use to do this.
In ydata-profiling
, we are able to produce summarizing statistics for a dataframe that is specified for a particular context. For instance, in our example, we can produce snapshots of data referring to each IoT device or to particular time frames:
from bytewax.window import EventClockConfig, TumblingWindow
# This is the accumulator function, and outputs a list of readings
def acc_values(acc, reading):
acc.append(reading)
return acc
# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
def get_time(reading):
return reading["ts"]
# Configure the `fold_window` operator to use the event time.
cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30))
# And a tumbling window
align_to = datetime(2020, 1, 1, tzinfo=timezone.utc)
wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1))
flow.fold_window("running_average", cc, wc, list, acc_values)
flow.inspect(print)
After the snapshots are defined, leveraging ydata-profiling
is as simple as calling the ProfileReport
for each of the dataframes we would like to analyze:
import pandas as pd
from ydata_profiling import ProfileReport
def profile(device_id__readings):
print(device_id__readings)
device_id, readings = device_id__readings
start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
df = pd.DataFrame(readings)
profile = ProfileReport(
df,
tsmode=True,
sortby="ts",
title=f"Sensor Readings - device: {device_id}"
)
profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html")
return f"device {device_id} profiled at hour {start_time}"
flow.map(profile)
In this example, we are writing the images out to local files as part of a function in a map method. These could be reported out via a messaging tool or we could save them to some remote storage in the future.
Once the profile is complete, the dataflow expects some output so we can use the built-in StdOutput
to print the device that was profiled and the time it was profiled at that was passed out of the profile function in the map step:
flow.output("out", StdOutput())
There are multiple ways to execute Bytewax dataflows. In this example, we use the same local machine, but Bytewax can also run on multiple Python processes, across multiple hosts, in a
In this article, we’ll continue with a local setup, but we encourage you to check our helper tool
Assuming we are in the same directory as the file with the dataflow definition, we can run it using:
python -m bytewax.run ydata-profiling-streaming:flow
We can then use the profiling reports to validate the data quality, check for changes in schemas or data formats, and compare the data characteristics between different devices or time windows.
In fact, we can leverage the
snapshot_a_report = ProfileReport(df_a, title="Snapshot A")
snapshot_b_report = ProfileReport(df_b, title="Snapshot B")
comparison_report =snapshot_a_report(snapshot_b_report)
comparison_report.to_file("comparison_report.html")
Validating data streams is crucial to identify issues in data quality in a continuous manner and compare the state of data across distinct periods of time.
For organizations in healthcare, energy, manufacturing, and entertainment — all working with continuous streams of data — automated profiling is key to establishing data governance best practices, from quality assessment to data privacy.
This requires the analysis of snapshots of data which, as showcased in this article, can be achieved in a seamless way by combining bytewax
and ydata-profiling
.
Bytewax takes care of all the processes necessary to handle and structure data streams into snapshots, which can then be summarized and compared with ydata-profiling through a comprehensive report of data characteristics.
Being able to appropriately process and profile incoming data opens up a plethora of use cases across different domains, from the correction of errors in data schemas and formats to the highlighting and mitigation of additional issues that derive from real-world activities, such as anomaly detection (e.g., fraud or intrusion/threats detection), equipment malfunction, and other events that deviate from the expectations (e.g., data drifts or misalignment with business rules).
Now, you’re all set to start exploring your data streams! Let us know what other use cases you find, and as always, feel free to drop us a line in the comments, or find us at the
This article was written by Fabiana Clemente (Co-founder & CDO @
You may find additional information about the OSS packages in the respective documentations:
Also published here