Trong bài đăng trên blog này, chúng tôi sẽ đề cập đến cách bạn có thể kết hợp và tận dụng giải pháp phát trực tuyến nguồn mở, , với , để cải thiện chất lượng luồng phát trực tuyến của bạn. Thắt dây an toàn! bytewax ydata-profiling Quá trình xử lý luồng cho phép phân tích dữ liệu trong thời gian thực và trước khi lưu trữ và có thể ở hoặc . trạng thái không trạng thái được sử dụng cho đề xuất , phát hiện mẫu hoặc xử lý sự kiện phức tạp, trong đó lịch sử của những gì đã xảy ra là bắt buộc đối với quá trình xử lý (cửa sổ, tham gia bằng khóa, v.v.). Xử lý luồng trạng thái thời gian thực được sử dụng để chuyển đổi không yêu cầu kiến thức về các điểm dữ liệu khác trong luồng như ẩn email hoặc chuyển đổi loại. Xử lý luồng không trạng thái nội tuyến Nhìn chung, các luồng dữ liệu được sử dụng rộng rãi trong ngành và có thể được áp dụng cho các trường hợp sử dụng như , hoặc . phát hiện gian lận theo dõi bệnh nhân bảo trì dự đoán sự kiện Một khía cạnh quan trọng mà tất cả các luồng dữ liệu phải xem xét là chất lượng của dữ liệu Không giống như các mô hình truyền thống nơi chất lượng dữ liệu thường được đánh giá trong quá trình tạo giải pháp bảng điều khiển hoặc kho dữ liệu, . truyền dữ liệu yêu cầu giám sát liên tục Điều cần thiết là duy trì chất lượng dữ liệu trong toàn bộ quá trình, từ thu thập đến cung cấp cho các ứng dụng xuôi dòng. Xét cho cùng, chi phí của chất lượng dữ liệu kém có thể cao đối với các tổ chức: “Cái giá của dữ liệu xấu chiếm tới 15% đến 25% doanh thu đáng kinh ngạc đối với hầu hết các công ty. (…) Hai phần ba chi phí này có thể được loại bỏ bằng cách đi đầu về chất lượng dữ liệu.” — Thomas C. Redman, tác giả cuốn “Dẫn đầu về chất lượng dữ liệu” Trong suốt bài viết này, chúng tôi sẽ chỉ cho bạn cách bạn có thể kết hợp với để lập hồ sơ và cải thiện chất lượng luồng phát của bạn! bytewa ydata-profiling Xử lý luồng cho chuyên gia dữ liệu với Bytewax là một khung xử lý luồng OSS được thiết kế dành riêng cho các nhà phát triển Python. sáp ong Nó cho phép người dùng với các khả năng tương tự như Flink, Spark và Kafka Streams đồng thời cung cấp giao diện thân thiện và quen thuộc cũng như xây dựng các đường ống truyền dữ liệu và ứng dụng thời gian thực khả năng tương thích 100% với hệ sinh thái Python. Sử dụng tích hợp hoặc các thư viện Python hiện có, (Kafka, RedPanda, WebSocket, v.v.) và ra các hệ thống hạ nguồn khác nhau (Kafka, tệp sàn gỗ, hồ dữ liệu, v.v.). kết nối bạn có thể kết nối với các nguồn dữ liệu trực tuyến và thời gian thực ghi dữ liệu đã chuyển đổi Đối với các phép biến đổi, Bytewax bằng các phương thức , và , đồng thời đi kèm với các tính năng quen thuộc như khôi phục và khả năng mở rộng. tạo điều kiện cho các phép biến đổi trạng thái và không trạng thái bản đồ cửa sổ tổng hợp sáp ong và được . tạo điều kiện cho trải nghiệm ưu tiên Python và lấy dữ liệu làm trung tâm cho các luồng dữ liệu xây dựng có chủ đích cho các kỹ sư dữ liệu và nhà khoa học dữ liệu Nó cho phép người dùng , đồng thời tạo các tùy chỉnh cần thiết để đáp ứng nhu cầu của họ mà không cần phải tìm hiểu và duy trì các nền tảng phát trực tuyến dựa trên JVM như Spark hoặc Flink. xây dựng các đường dẫn truyền dữ liệu và ứng dụng thời gian thực Bytewax rất phù hợp cho nhiều trường hợp sử dụng, cụ thể là, , , , và hơn thế nữa. Nhúng đường ống cho AI sáng tạo Xử lý các giá trị bị thiếu trong luồng dữ liệu Sử dụng các mô hình ngôn ngữ trong bối cảnh phát trực tuyến để hiểu thị trường tài chính Để biết cảm hứng về trường hợp sử dụng và biết thêm thông tin như tài liệu, hướng dẫn và hướng dẫn, vui lòng kiểm tra . trang web bywax Tại sao lập hồ sơ dữ liệu cho luồng dữ liệu? và đề cập đến bước : cấu trúc, hành vi và chất lượng của nó. Lập hồ sơ dữ liệu là chìa khóa để bắt đầu thành công bất kỳ nhiệm vụ học máy nào hiểu thấu đáo dữ liệu của chúng tôi Tóm lại, liên quan đến việc phân tích các khía cạnh liên quan đến định dạng của dữ liệu và các mô tả cơ bản (ví dụ: số lượng mẫu, số lượng/loại tính năng, giá trị trùng lặp), (chẳng hạn như thiếu dữ liệu hoặc tính năng không cân bằng) và các yếu tố phức tạp khác có thể phát sinh trong quá trình thu thập hoặc xử lý dữ liệu (ví dụ: giá trị sai hoặc tính năng không nhất quán). hồ sơ dữ liệu đặc điểm nội tại , trong đó hoàn cảnh có thể thay đổi nhanh chóng và có thể yêu cầu hành động ngay lập tức (ví dụ: giám sát chăm sóc sức khỏe, giá trị kho hàng, chính sách chất lượng không khí). Việc đảm bảo các tiêu chuẩn chất lượng dữ liệu cao là rất quan trọng đối với tất cả các miền và tổ chức, nhưng đặc biệt phù hợp với các miền hoạt động với các miền xuất dữ liệu liên tục Đối với nhiều miền, cấu hình dữ liệu được sử dụng từ lăng kính phân tích dữ liệu khám phá, xem xét dữ liệu lịch sử được lưu trữ trong cơ sở dữ liệu. Ngược lại, đối với các luồng dữ liệu, , trong đó dữ liệu cần được kiểm tra ở các khung thời gian hoặc giai đoạn khác nhau của quy trình. việc lập hồ sơ dữ liệu trở nên cần thiết để xác thực và kiểm soát chất lượng liên tục dọc theo luồng Bằng cách nhúng , chúng tôi có thể về trạng thái hiện tại của dữ liệu và được cảnh báo về bất kỳ vấn đề nghiêm trọng tiềm ẩn nào — cho dù chúng có liên quan đến (ví dụ: giá trị bị hỏng hoặc thay đổi định dạng) hoặc (ví dụ: dữ liệu trôi dạt, sai lệch so với quy tắc và kết quả kinh doanh). hồ sơ tự động vào luồng dữ liệu của mình ngay lập tức nhận phản hồi tính nhất quán và tính toàn vẹn của dữ liệu các sự kiện xảy ra trong khoảng thời gian ngắn Trong các miền trong thế giới thực — — lập hồ sơ tự động có thể cứu chúng ta khỏi nhiều câu đố hóc búa và các hệ thống cần phải ngừng sản xuất! nơi bạn chỉ cần biết định luật Murphy chắc chắn sẽ xảy ra và “mọi thứ chắc chắn có thể sai sót” Trong những gì liên quan đến lập hồ sơ dữ liệu, luôn là một , hoặc cho hoặc dữ liệu. Và không có thắc mắc tại sao — ydata-profiling đám đông yêu thích dạng bảng chuỗi thời gian đó là một dòng mã cho một tập hợp phân tích và thông tin chi tiết sâu rộng. Các hoạt động phức tạp và tốn thời gian được thực hiện ẩn: cấu hình ydata và tùy thuộc vào loại đối tượng (dạng số hoặc phân loại), nó được hiển thị trong báo cáo lược tả. tự động phát hiện các loại đối tượng có trong dữ liệu điều chỉnh thống kê tóm tắt và hình ảnh hóa Thúc đẩy , gói này cũng , tập trung vào và theo cặp của chúng và cung cấp , từ các giá trị hoặc đến các tính năng và . phân tích tập trung vào dữ liệu làm nổi bật các mối quan hệ hiện có giữa các tính năng các tương tác mối tương quan đánh giá kỹ lưỡng về cảnh báo chất lượng dữ liệu trùng lặp không đổi bị lệch mất cân bằng Đó thực sự là chế — với nỗ lực tối thiểu. độ xem 360º về chất lượng dữ liệu của chúng tôi Kết hợp tất cả lại với nhau: hồ sơ bytewax và ydata Trước khi bắt đầu dự án, trước tiên chúng ta cần đặt các phụ thuộc Python và định cấu hình nguồn dữ liệu của mình. Trước tiên, hãy cài đặt các gói cấu hình và ( bytewax ydata-profiling Bạn có thể muốn sử dụng một môi trường ảo cho việc này — kiểm tra các hướng dẫn này nếu bạn cần một số hướng dẫn thêm!) pip install bytewax==0.16.2 ydata-profiling==4.3.1 Sau đó, chúng tôi sẽ tải lên (Giấy phép — CC0: Miền công cộng), chứa một số phép đo từ các thiết bị IoT khác nhau: Bộ dữ liệu từ xa cảm biến môi trường nhiệt độ, độ ẩm, khí dầu mỏ lỏng carbon monoxide, khói, ánh sáng và chuyển động wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000 và đầu vào sẽ trông giống như những gì chúng ta mong đợi trong một nền tảng phát trực tuyến . Trong bài viết này, và tạo luồng dữ liệu bằng cách sử dụng bytewax. Trong môi trường sản xuất, các phép đo này sẽ được tạo liên tục bởi từng thiết bị chẳng hạn như Kafka để mô phỏng bối cảnh mà chúng tôi sẽ tìm thấy với dữ liệu truyền trực tuyến, chúng tôi sẽ đọc dữ liệu từ tệp CSV từng dòng một (Xin lưu ý nhanh, luồng dữ liệu về cơ bản là một đường dẫn dữ liệu có thể được mô tả dưới dạng biểu đồ tuần hoàn có hướng - DAG) Trước tiên, hãy thực hiện một số : nhập khẩu cần thiết from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main Sau đó, chúng tôi xác định đối tượng luồng dữ liệu của mình. Sau đó, chúng tôi sẽ sử dụng một phương pháp bản đồ không trạng thái trong đó chúng tôi chuyển vào một hàm để chuyển đổi chuỗi thành đối tượng DateTime và cấu trúc lại dữ liệu thành định dạng (device_id, data). Phương thức bản đồ sẽ thực hiện thay đổi đối với từng điểm dữ liệu theo cách không trạng thái. Lý do chúng tôi đã sửa đổi hình dạng dữ liệu của mình là để chúng tôi có thể dễ dàng nhóm dữ liệu trong các bước tiếp theo để lập hồ sơ dữ liệu cho từng thiết bị riêng biệt thay vì cho tất cả các thiết bị cùng một lúc. flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data)) Bây giờ, chúng tôi sẽ tận dụng các khả năng có trạng thái của để thu thập dữ liệu cho từng thiết bị trong khoảng thời gian mà chúng tôi đã xác định. mong đợi một ảnh chụp nhanh dữ liệu theo thời gian, điều này làm cho toán tử cửa sổ trở thành phương pháp hoàn hảo để sử dụng để thực hiện việc này. bytewax ydata-profiling Trong , chúng tôi có thể tạo số liệu thống kê tóm tắt cho một khung dữ liệu được chỉ định cho một ngữ cảnh cụ thể. Chẳng hạn, trong ví dụ của chúng tôi, chúng tôi có thể tạo ảnh chụp nhanh dữ liệu liên quan đến từng thiết bị IoT hoặc các khung thời gian cụ thể: ydata-profiling from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print) Sau khi các ảnh chụp nhanh được xác định, việc tận dụng cũng đơn giản như gọi cho từng khung dữ liệu mà chúng tôi muốn phân tích: ydata-profiling ProfileReport import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile) Trong ví dụ này, chúng tôi đang ghi hình ảnh ra tệp cục bộ như một phần của chức năng trong phương thức bản đồ. Chúng có thể được báo cáo thông qua một công cụ nhắn tin hoặc chúng tôi có thể lưu chúng vào một số bộ lưu trữ từ xa trong tương lai. Khi hồ sơ hoàn tất, luồng dữ liệu sẽ mong đợi một số đầu ra để chúng tôi có thể sử dụng tích hợp để in thiết bị đã được lập hồ sơ và thời gian thiết bị được lập hồ sơ tại thời điểm đó đã được chuyển ra khỏi chức năng hồ sơ trong bước bản đồ: StdOutput flow.output("out", StdOutput()) Có nhiều cách để thực thi luồng dữ liệu Bytewax. Trong ví dụ này, chúng tôi sử dụng cùng một máy cục bộ, nhưng Bytewax cũng có thể chạy trên nhiều quy trình Python, trên nhiều máy chủ, trong một , sử dụng một , Và . bộ chứa docker Cụm Kubernetes hơn Trong bài viết này, chúng tôi sẽ tiếp tục với thiết lập cục bộ, nhưng chúng tôi khuyến khích bạn kiểm tra công cụ trợ giúp của chúng tôi quản lý việc triển khai luồng dữ liệu Kubernetes sau khi quy trình của bạn sẵn sàng chuyển sang sản xuất. waxctl Giả sử chúng ta đang ở trong cùng thư mục với tệp có định nghĩa luồng dữ liệu, chúng ta có thể chạy nó bằng cách sử dụng: python -m bytewax.run ydata-profiling-streaming:flow Sau đó, chúng tôi có thể sử dụng các báo cáo định hình để xác thực chất lượng dữ liệu, kiểm tra các thay đổi trong lược đồ hoặc định dạng dữ liệu và . so sánh các đặc điểm dữ liệu giữa các thiết bị hoặc khung thời gian khác nhau Trên thực tế, chúng ta có thể tận dụng làm nổi bật sự khác biệt giữa hai hồ sơ dữ liệu một cách đơn giản, giúp chúng tôi dễ dàng phát hiện các mẫu quan trọng cần điều tra hoặc các vấn đề cần giải quyết: chức năng báo cáo so sánh snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html") Bạn đã sẵn sàng khám phá các luồng dữ liệu của riêng mình chưa? Việc xác thực các luồng dữ liệu là rất quan trọng để xác định các vấn đề về chất lượng dữ liệu một cách liên tục và so sánh trạng thái của dữ liệu trong các khoảng thời gian khác nhau. Đối với các tổ chức trong , , và — tất cả đều hoạt động với các luồng dữ liệu liên tục — một , từ đánh giá chất lượng đến quyền riêng tư của dữ liệu. lĩnh vực chăm sóc sức khỏe năng lượng sản xuất giải trí hồ sơ tự động hóa là chìa khóa để thiết lập các phương pháp hay nhất về quản trị dữ liệu Điều này yêu cầu phân tích các ảnh chụp nhanh dữ liệu, như được giới thiệu trong bài viết này, có thể đạt được một cách liền mạch bằng cách kết hợp và . bytewax ydata-profiling đảm nhận tất cả các quy trình cần thiết để xử lý và cấu trúc các luồng dữ liệu thành ảnh chụp nhanh, sau đó có thể tóm tắt và so sánh với thông qua một báo cáo toàn diện về các đặc điểm dữ liệu. Bytewax hồ sơ ydata Khả năng xử lý và lập hồ sơ dữ liệu đến một cách thích hợp sẽ mở ra rất nhiều trường hợp sử dụng trên các miền khác nhau, từ đến làm nổi bật và giảm thiểu các sự cố bổ sung xuất phát từ các hoạt động trong thế giới thực, chẳng hạn như (ví dụ: phát hiện gian lận hoặc xâm nhập/đe dọa), và các sự kiện khác không như mong đợi (ví dụ: dữ liệu trôi dạt hoặc sai lệch với quy tắc kinh doanh). sửa lỗi trong lược đồ và định dạng dữ liệu phát hiện bất thường sự cố thiết bị Bây giờ, bạn đã sẵn sàng để bắt đầu khám phá các luồng dữ liệu của mình! Hãy cho chúng tôi biết những trường hợp sử dụng khác mà bạn tìm thấy và như mọi khi, vui lòng gửi cho chúng tôi một dòng trong nhận xét hoặc tìm chúng tôi tại để biết thêm câu hỏi và gợi ý! Cộng đồng AI tập trung vào dữ liệu Hẹn gặp bạn ở đó! Sự nhìn nhận ) và Oli Makhasoeva (Quan hệ nhà phát triển @ ) -- đang phát triển . Bài viết này được viết bởi Fabiana Clemente (Đồng sáng lập & CDO @ Ydữ liệu ) và Miriam Santos (Quan hệ nhà phát triển @ Ydữ liệu ) -- đang phát triển hồ sơ ydata -- và Zander Matheson (CEO & Founder @ sáp ong sáp ong sáp ong Bạn có thể tìm thêm thông tin về các gói PMNM trong các tài liệu tương ứng: & . tài liệu hồ sơ ydata tài liệu bytewax Cũng được xuất bản ở đây