Trong bài đăng trên blog này, chúng tôi sẽ đề cập đến cách bạn có thể kết hợp và tận dụng giải pháp phát trực tuyến nguồn mở, bytewax , với ydata-profiling , để cải thiện chất lượng luồng phát trực tuyến của bạn. Thắt dây an toàn!
Quá trình xử lý luồng cho phép phân tích dữ liệu trong thời gian thực và trước khi lưu trữ và có thể ở trạng thái hoặc không trạng thái .
Xử lý luồng trạng thái được sử dụng cho đề xuất thời gian thực , phát hiện mẫu hoặc xử lý sự kiện phức tạp, trong đó lịch sử của những gì đã xảy ra là bắt buộc đối với quá trình xử lý (cửa sổ, tham gia bằng khóa, v.v.).
Xử lý luồng không trạng thái được sử dụng để chuyển đổi nội tuyến không yêu cầu kiến thức về các điểm dữ liệu khác trong luồng như ẩn email hoặc chuyển đổi loại.
Nhìn chung, các luồng dữ liệu được sử dụng rộng rãi trong ngành và có thể được áp dụng cho các trường hợp sử dụng như phát hiện gian lận , theo dõi bệnh nhân hoặc bảo trì dự đoán sự kiện .
Không giống như các mô hình truyền thống nơi chất lượng dữ liệu thường được đánh giá trong quá trình tạo giải pháp bảng điều khiển hoặc kho dữ liệu, truyền dữ liệu yêu cầu giám sát liên tục .
Điều cần thiết là duy trì chất lượng dữ liệu trong toàn bộ quá trình, từ thu thập đến cung cấp cho các ứng dụng xuôi dòng. Xét cho cùng, chi phí của chất lượng dữ liệu kém có thể cao đối với các tổ chức:
“Cái giá của dữ liệu xấu chiếm tới 15% đến 25% doanh thu đáng kinh ngạc đối với hầu hết các công ty. (…) Hai phần ba chi phí này có thể được loại bỏ bằng cách đi đầu về chất lượng dữ liệu.”
— Thomas C. Redman, tác giả cuốn “Dẫn đầu về chất lượng dữ liệu”
Trong suốt bài viết này, chúng tôi sẽ chỉ cho bạn cách bạn có thể kết hợp bytewa
với ydata-profiling
để lập hồ sơ và cải thiện chất lượng luồng phát của bạn!
Nó cho phép người dùng xây dựng các đường ống truyền dữ liệu và ứng dụng thời gian thực với các khả năng tương tự như Flink, Spark và Kafka Streams đồng thời cung cấp giao diện thân thiện và quen thuộc cũng như khả năng tương thích 100% với hệ sinh thái Python.
Sử dụng tích hợp
Đối với các phép biến đổi, Bytewax tạo điều kiện cho các phép biến đổi trạng thái và không trạng thái bằng các phương thức bản đồ , cửa sổ và tổng hợp , đồng thời đi kèm với các tính năng quen thuộc như khôi phục và khả năng mở rộng.
sáp ong
Nó cho phép người dùng xây dựng các đường dẫn truyền dữ liệu và ứng dụng thời gian thực , đồng thời tạo các tùy chỉnh cần thiết để đáp ứng nhu cầu của họ mà không cần phải tìm hiểu và duy trì các nền tảng phát trực tuyến dựa trên JVM như Spark hoặc Flink.
Bytewax rất phù hợp cho nhiều trường hợp sử dụng, cụ thể là,
Để biết cảm hứng về trường hợp sử dụng và biết thêm thông tin như tài liệu, hướng dẫn và hướng dẫn, vui lòng kiểm tra
Lập hồ sơ dữ liệu là chìa khóa để bắt đầu thành công bất kỳ nhiệm vụ học máy nào và đề cập đến bước
Tóm lại,
Việc đảm bảo các tiêu chuẩn chất lượng dữ liệu cao là rất quan trọng đối với tất cả các miền và tổ chức, nhưng đặc biệt phù hợp với các miền hoạt động với các miền xuất dữ liệu liên tục , trong đó hoàn cảnh có thể thay đổi nhanh chóng và có thể yêu cầu hành động ngay lập tức (ví dụ: giám sát chăm sóc sức khỏe, giá trị kho hàng, chính sách chất lượng không khí).
Đối với nhiều miền, cấu hình dữ liệu được sử dụng từ lăng kính phân tích dữ liệu khám phá, xem xét dữ liệu lịch sử được lưu trữ trong cơ sở dữ liệu. Ngược lại, đối với các luồng dữ liệu, việc lập hồ sơ dữ liệu trở nên cần thiết để xác thực và kiểm soát chất lượng liên tục dọc theo luồng , trong đó dữ liệu cần được kiểm tra ở các khung thời gian hoặc giai đoạn khác nhau của quy trình.
Bằng cách nhúng hồ sơ tự động vào luồng dữ liệu của mình , chúng tôi có thể ngay lập tức nhận phản hồi về trạng thái hiện tại của dữ liệu và được cảnh báo về bất kỳ vấn đề nghiêm trọng tiềm ẩn nào — cho dù chúng có liên quan đến tính nhất quán và tính toàn vẹn của dữ liệu (ví dụ: giá trị bị hỏng hoặc thay đổi định dạng) hoặc các sự kiện xảy ra trong khoảng thời gian ngắn (ví dụ: dữ liệu trôi dạt, sai lệch so với quy tắc và kết quả kinh doanh).
Trong các miền trong thế giới thực — nơi bạn chỉ cần biết định luật Murphy chắc chắn sẽ xảy ra và “mọi thứ chắc chắn có thể sai sót” — lập hồ sơ tự động có thể cứu chúng ta khỏi nhiều câu đố hóc búa và các hệ thống cần phải ngừng sản xuất!
Trong những gì liên quan đến lập hồ sơ dữ liệu, ydata-profiling
luôn là một
Các hoạt động phức tạp và tốn thời gian được thực hiện ẩn: cấu hình ydata tự động phát hiện các loại đối tượng có trong dữ liệu và tùy thuộc vào loại đối tượng (dạng số hoặc phân loại), nó điều chỉnh thống kê tóm tắt và hình ảnh hóa được hiển thị trong báo cáo lược tả.
Thúc đẩy phân tích tập trung vào dữ liệu , gói này cũng làm nổi bật các mối quan hệ hiện có giữa các tính năng , tập trung vào các tương tác và mối tương quan theo cặp của chúng và cung cấp đánh giá kỹ lưỡng về cảnh báo chất lượng dữ liệu , từ các giá trị trùng lặp hoặc không đổi đến các tính năng bị lệch và mất cân bằng .
Đó thực sự là chế độ xem 360º về chất lượng dữ liệu của chúng tôi — với nỗ lực tối thiểu.
Trước khi bắt đầu dự án, trước tiên chúng ta cần đặt các phụ thuộc Python và định cấu hình nguồn dữ liệu của mình.
Trước tiên, hãy cài đặt các gói cấu hình bytewax
và ydata-profiling
( Bạn có thể muốn sử dụng một môi trường ảo cho việc này —
pip install bytewax==0.16.2 ydata-profiling==4.3.1
Sau đó, chúng tôi sẽ tải lên
wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000
Trong môi trường sản xuất, các phép đo này sẽ được tạo liên tục bởi từng thiết bị và đầu vào sẽ trông giống như những gì chúng ta mong đợi trong một nền tảng phát trực tuyến
(Xin lưu ý nhanh, luồng dữ liệu về cơ bản là một đường dẫn dữ liệu có thể được mô tả dưới dạng biểu đồ tuần hoàn có hướng - DAG)
Trước tiên, hãy thực hiện một số nhập khẩu cần thiết :
from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main
Sau đó, chúng tôi xác định đối tượng luồng dữ liệu của mình. Sau đó, chúng tôi sẽ sử dụng một phương pháp bản đồ không trạng thái trong đó chúng tôi chuyển vào một hàm để chuyển đổi chuỗi thành đối tượng DateTime và cấu trúc lại dữ liệu thành định dạng (device_id, data).
Phương thức bản đồ sẽ thực hiện thay đổi đối với từng điểm dữ liệu theo cách không trạng thái. Lý do chúng tôi đã sửa đổi hình dạng dữ liệu của mình là để chúng tôi có thể dễ dàng nhóm dữ liệu trong các bước tiếp theo để lập hồ sơ dữ liệu cho từng thiết bị riêng biệt thay vì cho tất cả các thiết bị cùng một lúc.
flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))
Bây giờ, chúng tôi sẽ tận dụng các khả năng có trạng thái của bytewax
để thu thập dữ liệu cho từng thiết bị trong khoảng thời gian mà chúng tôi đã xác định. ydata-profiling
mong đợi một ảnh chụp nhanh dữ liệu theo thời gian, điều này làm cho toán tử cửa sổ trở thành phương pháp hoàn hảo để sử dụng để thực hiện việc này.
Trong ydata-profiling
, chúng tôi có thể tạo số liệu thống kê tóm tắt cho một khung dữ liệu được chỉ định cho một ngữ cảnh cụ thể. Chẳng hạn, trong ví dụ của chúng tôi, chúng tôi có thể tạo ảnh chụp nhanh dữ liệu liên quan đến từng thiết bị IoT hoặc các khung thời gian cụ thể:
from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)
Sau khi các ảnh chụp nhanh được xác định, việc tận dụng ydata-profiling
cũng đơn giản như gọi ProfileReport
cho từng khung dữ liệu mà chúng tôi muốn phân tích:
import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)
Trong ví dụ này, chúng tôi đang ghi hình ảnh ra tệp cục bộ như một phần của chức năng trong phương thức bản đồ. Chúng có thể được báo cáo thông qua một công cụ nhắn tin hoặc chúng tôi có thể lưu chúng vào một số bộ lưu trữ từ xa trong tương lai.
Khi hồ sơ hoàn tất, luồng dữ liệu sẽ mong đợi một số đầu ra để chúng tôi có thể sử dụng StdOutput
tích hợp để in thiết bị đã được lập hồ sơ và thời gian thiết bị được lập hồ sơ tại thời điểm đó đã được chuyển ra khỏi chức năng hồ sơ trong bước bản đồ:
flow.output("out", StdOutput())
Có nhiều cách để thực thi luồng dữ liệu Bytewax. Trong ví dụ này, chúng tôi sử dụng cùng một máy cục bộ, nhưng Bytewax cũng có thể chạy trên nhiều quy trình Python, trên nhiều máy chủ, trong một
Trong bài viết này, chúng tôi sẽ tiếp tục với thiết lập cục bộ, nhưng chúng tôi khuyến khích bạn kiểm tra công cụ trợ giúp của chúng tôi
Giả sử chúng ta đang ở trong cùng thư mục với tệp có định nghĩa luồng dữ liệu, chúng ta có thể chạy nó bằng cách sử dụng:
python -m bytewax.run ydata-profiling-streaming:flow
Sau đó, chúng tôi có thể sử dụng các báo cáo định hình để xác thực chất lượng dữ liệu, kiểm tra các thay đổi trong lược đồ hoặc định dạng dữ liệu và so sánh các đặc điểm dữ liệu giữa các thiết bị hoặc khung thời gian khác nhau .
Trên thực tế, chúng ta có thể tận dụng
snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")
Việc xác thực các luồng dữ liệu là rất quan trọng để xác định các vấn đề về chất lượng dữ liệu một cách liên tục và so sánh trạng thái của dữ liệu trong các khoảng thời gian khác nhau.
Đối với các tổ chức trong lĩnh vực chăm sóc sức khỏe , năng lượng , sản xuất và giải trí — tất cả đều hoạt động với các luồng dữ liệu liên tục — một hồ sơ tự động hóa là chìa khóa để thiết lập các phương pháp hay nhất về quản trị dữ liệu , từ đánh giá chất lượng đến quyền riêng tư của dữ liệu.
Điều này yêu cầu phân tích các ảnh chụp nhanh dữ liệu, như được giới thiệu trong bài viết này, có thể đạt được một cách liền mạch bằng cách kết hợp bytewax
và ydata-profiling
.
Bytewax đảm nhận tất cả các quy trình cần thiết để xử lý và cấu trúc các luồng dữ liệu thành ảnh chụp nhanh, sau đó có thể tóm tắt và so sánh với hồ sơ ydata thông qua một báo cáo toàn diện về các đặc điểm dữ liệu.
Khả năng xử lý và lập hồ sơ dữ liệu đến một cách thích hợp sẽ mở ra rất nhiều trường hợp sử dụng trên các miền khác nhau, từ sửa lỗi trong lược đồ và định dạng dữ liệu đến làm nổi bật và giảm thiểu các sự cố bổ sung xuất phát từ các hoạt động trong thế giới thực, chẳng hạn như phát hiện bất thường (ví dụ: phát hiện gian lận hoặc xâm nhập/đe dọa), sự cố thiết bị và các sự kiện khác không như mong đợi (ví dụ: dữ liệu trôi dạt hoặc sai lệch với quy tắc kinh doanh).
Bây giờ, bạn đã sẵn sàng để bắt đầu khám phá các luồng dữ liệu của mình! Hãy cho chúng tôi biết những trường hợp sử dụng khác mà bạn tìm thấy và như mọi khi, vui lòng gửi cho chúng tôi một dòng trong nhận xét hoặc tìm chúng tôi tại
Bài viết này được viết bởi Fabiana Clemente (Đồng sáng lập & CDO @
Bạn có thể tìm thêm thông tin về các gói PMNM trong các tài liệu tương ứng:
Cũng được xuất bản ở đây