Chạy lại nguồn mở vào tháng 2 đã đánh dấu một bước quan trọng đối với những người đang tìm kiếm các thư viện trực quan hóa Python mạnh nhưng có thể truy cập được.
Trực quan hóa là điều cần thiết vì các công ty như Scale.ai, Weights & Biases và Hugging Face đã hợp lý hóa deep learning bằng cách giải quyết vấn đề ghi nhãn tập dữ liệu, theo dõi thử nghiệm và các mô hình được đào tạo trước. Tuy nhiên, vẫn còn khoảng trống trong việc thu thập và trực quan hóa dữ liệu nhanh chóng.
Nhiều công ty phát triển các giải pháp trực quan hóa dữ liệu nội bộ nhưng thường kết thúc bằng các công cụ dưới mức tối ưu do chi phí phát triển cao. Hơn nữa, trực quan hóa Python trên luồng dữ liệu cũng là một vấn đề chưa được giải quyết tốt, dẫn đến các giải pháp dựa trên JavaScrip trong sổ ghi chép. Chạy lại tận dụng giao diện Python thành một công cụ trực quan hóa Rust hiệu suất cao (giống như Bytewax!) giúp dễ dàng phân tích dữ liệu phát trực tuyến.
Trong bài đăng trên blog này, chúng ta sẽ khám phá cách sử dụng Bytewax và Chạy lại để trực quan hóa dữ liệu phát trực tuyến theo thời gian thực trong Python và tạo trực quan hóa phát hiện bất thường theo thời gian thực.
Chúng tôi đã chọn phát hiện bất thường, hay còn gọi là phát hiện ngoại lệ vì đây là một thành phần quan trọng trong nhiều ứng dụng, chẳng hạn như an ninh mạng, phát hiện gian lận và giám sát các quy trình công nghiệp. Hình dung những điểm bất thường này trong thời gian thực có thể giúp xác định nhanh các vấn đề tiềm ẩn và thực hiện các hành động cần thiết để giảm thiểu chúng.
Đối với những người háo hức tham gia, hãy xem giải pháp Python từ đầu đến cuối trên GitHub của chúng tôi. Đừng quên đánh dấu sao Bytewax!
Đây là những gì chúng tôi sẽ đề cập:
Đi nào!
Bài đăng trên blog này dựa trên các phiên bản sau của Bytewax và Chạy lại:
bytewax==0.15.1 rerun-sdk==0.4.0
Chạy lại và Bytewax có thể cài đặt như
pip install rerun-sdk pip install bytewax
Theo dõi Bytewax để biết các bản cập nhật vì chúng tôi đang phát hành một phiên bản mới sẽ giúp dễ dàng phát triển hơn nữa các ứng dụng truyền dữ liệu trong Python.
Giải pháp tương đối nhỏ gọn, vì vậy chúng tôi sao chép toàn bộ ví dụ về mã tại đây. Xin vui lòng bỏ qua phần lớn này nếu nó có vẻ quá sức; chúng ta sẽ thảo luận về từng chức năng sau.
import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow)
Mã được cung cấp trình bày cách tạo quy trình phát hiện bất thường theo thời gian thực bằng cách sử dụng Bytewax và Chạy lại.
Hãy chia nhỏ các thành phần thiết yếu của mã này:
generate_random_metrics : Hàm này tạo số liệu ngẫu nhiên mô phỏng các luồng dữ liệu trong thế giới thực. Nó tạo ra các điểm dữ liệu với khả năng xảy ra bất thường nhỏ (giá trị tăng gấp đôi).
ZTestDetector : Lớp này triển khai trình phát hiện bất thường bằng phương pháp điểm Z. Nó duy trì giá trị trung bình và độ lệch chuẩn của 10 giá trị gần nhất và đánh dấu một giá trị là bất thường nếu điểm Z của nó lớn hơn một ngưỡng đã chỉ định.
output_builder : Chức năng này được sử dụng để xác định hành vi đầu ra cho đường dẫn dữ liệu. Trong trường hợp này, nó in tên chỉ số, giá trị, giá trị trung bình, độ lệch chuẩn và giá trị có bất thường hay không.
Luồng dữ liệu : Phần chính của mã xây dựng luồng dữ liệu bằng cách sử dụng Bytewax, kết nối RandomMetricInput, ZTestDetector và trình tạo đầu ra.
Trực quan hóa chạy lại : Trực quan hóa Chạy lại được tích hợp vào lớp ZTestDetector. Các hàm rr.log_scalar và rr.log_point được sử dụng để vẽ các điểm dữ liệu và trạng thái bất thường tương ứng của chúng.
Bây giờ, với sự hiểu biết về các thành phần chính của mã, chúng ta hãy thảo luận về cách tạo trực quan hóa từng bước.
Để tạo một đường dẫn luồng dữ liệu, bạn cần phải:
flow = Dataflow()
.flow.input("input", ManualInputConfig(generate_random_metrics))
.flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
.flow.capture(ManualOutputConfig(output_builder))
.spawn_cluster(flow, proc_count=3)
.
Luồng dữ liệu kết quả đọc các giá trị chỉ số được tạo ngẫu nhiên từ input_builder
, chuyển chúng qua ZTestDetector
để phát hiện bất thường và xuất kết quả bằng cách sử dụng hàm output_builder
. Hãy làm rõ các chi tiết cho từng bước.
generate_random_metrics
Hàm generate_random_metrics
đóng vai trò là nguồn đầu vào thay thế cho đường ống luồng dữ liệu, tạo các giá trị chỉ số ngẫu nhiên theo cách phân tán trên nhiều công nhân. Nó chấp nhận ba tham số: worker_index
, worker_count
và resume_state
.
def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0)
worker_index
: Chỉ mục của worker hiện tại trong đường dẫn luồng dữ liệu.
worker_count
: Tổng số công nhân trong đường ống luồng dữ liệu.
resume_state
: Trạng thái của nguồn đầu vào để tiếp tục. Trong trường hợp này, nó được khẳng định là None
, cho biết rằng nguồn đầu vào không hỗ trợ tiếp tục từ trạng thái trước đó.
Dưới đây là mô tả từng bước của hàm generate_random_metrics
:
resume_state
là None
.Tạo một giá trị ngẫu nhiên trong khoảng từ 0 đến 10.
Với xác suất 10%, hãy nhân đôi giá trị để mô phỏng sự bất thường.
Mang lại một bộ chứa Không (để biểu thị không có khóa phân vùng cụ thể), khóa, giá trị được tạo và thời gian đã trôi qua kể từ thời điểm bắt đầu (không được cung cấp trong đoạn mã).
Giới thiệu thời gian ngủ giữa mỗi giá trị được tạo để mô phỏng việc tạo dữ liệu theo thời gian thực.
Hàm generate_random_metrics
được sử dụng trong luồng dữ liệu làm nguồn đầu vào với dòng mã sau:
flow.input("input", ManualInputConfig(generate_random_metrics))
Dòng này báo cho luồng dữ liệu sử dụng lớp RandomMetricInput
để tạo dữ liệu đầu vào cho đường ống.
ZTestDetector
Lớp ZTestDetector
là một trình phát hiện bất thường sử dụng phương pháp điểm Z để xác định xem một điểm dữ liệu có bất thường hay không. Điểm Z là số độ lệch chuẩn của một điểm dữ liệu so với giá trị trung bình của một tập dữ liệu. Nếu điểm Z của một điểm dữ liệu cao hơn ngưỡng đã chỉ định, điểm đó được coi là bất thường.
Lớp có các phương thức sau:
__init__(self, threshold_z)
: Hàm tạo khởi tạo ZTestDetector với giá trị điểm Z ngưỡng. Nó cũng khởi tạo danh sách 10 giá trị cuối cùng (self.last_10), giá trị trung bình (self.mu) và độ lệch chuẩn (self.sigma).
_push(self, value)
: Phương thức riêng tư này được sử dụng để cập nhật danh sách 10 giá trị cuối cùng với giá trị mới. Nó chèn giá trị mới vào đầu danh sách và xóa giá trị cũ nhất, duy trì độ dài danh sách ở mức 10.
_recalc_stats(self)
: Phương thức riêng tư này tính toán lại giá trị trung bình và độ lệch chuẩn dựa trên các giá trị hiện tại trong danh sách self.last_10.
push(self, key__value__t)
: Phương thức công khai này lấy một bộ chứa khóa, giá trị và dấu thời gian làm đầu vào. Nó tính toán điểm Z cho giá trị, cập nhật danh sách 10 giá trị cuối cùng và tính toán lại giá trị trung bình và độ lệch chuẩn. Nó cũng ghi lại điểm dữ liệu và trạng thái bất thường của nó bằng cách sử dụng các chức năng trực quan hóa của Chạy lại. Cuối cùng, nó trả về thể hiện đã cập nhật của lớp ZTestDetector và một bộ chứa giá trị, giá trị trung bình, độ lệch chuẩn và trạng thái bất thường.
Lớp ZTestDetector được sử dụng trong đường dẫn luồng dữ liệu dưới dạng bản đồ trạng thái với mã sau:
flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
Dòng này báo cho luồng dữ liệu áp dụng ZTestDetector
với ngưỡng điểm Z là 2.0
và sử dụng phương thức push
để xử lý các điểm dữ liệu.
Để trực quan hóa các điểm bất thường, lớp ZTestDetector
ghi lại các điểm dữ liệu và trạng thái bất thường tương ứng của chúng bằng cách sử dụng các chức năng trực quan hóa của Chạy lại. Cụ thể, rr.log_scalar
được sử dụng để vẽ một giá trị vô hướng, trong khi rr.log_point
được sử dụng để vẽ các điểm 3D.
Đoạn mã sau đây cho biết cách tạo trực quan hóa:
rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)
Ở đây, trước tiên chúng tôi ghi lại một giá trị vô hướng đại diện cho số liệu. Sau đó, tùy thuộc vào việc giá trị có bất thường hay không, chúng tôi ghi lại một điểm 3D có bán kính và màu khác. Các điểm bất thường được ghi bằng màu đỏ với bán kính lớn hơn, trong khi các điểm không dị thường được ghi bằng bán kính nhỏ hơn.
output_builder
Hàm output_builder
được sử dụng để xác định hành vi đầu ra cho đường dẫn dữ liệu. Trong ví dụ cụ thể này, nó chịu trách nhiệm in tên chỉ số, giá trị, giá trị trung bình, độ lệch chuẩn và liệu giá trị đó có bất thường hay không.
Hàm nhận hai đối số: worker_index
và worker_count
. Các đối số này giúp hàm hiểu chỉ mục của công nhân và tổng số công nhân trong đường ống luồng dữ liệu.
Đây là định nghĩa của hàm output_builder
:
def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector
Hàm này là hàm bậc cao hơn, có nghĩa là nó trả về một hàm khác có tên là inspector
. Hàm inspector
chịu trách nhiệm xử lý bộ dữ liệu đầu vào và in đầu ra mong muốn.
Hàm trình tạo đầu ra sau đó được sử dụng trong đường ống luồng dữ liệu khi định cấu hình hành vi đầu ra với
flow.capture(ManualOutputConfig(output_builder)).
Bytewax có thể chạy dưới dạng một quy trình đơn lẻ hoặc theo cách đa quy trình. Luồng dữ liệu này đã được tạo để mở rộng quy mô trên nhiều quy trình, nhưng chúng tôi sẽ bắt đầu chạy nó dưới dạng một quy trình duy nhất với mô-đun thực thi spawn_cluster
.
spawn_cluster(flow)
Nếu chúng tôi muốn tăng tính song song, chúng tôi chỉ cần thêm nhiều quy trình hơn làm đối số.
Ví dụ - spawn_cluster(flow, proc_count=3)
.
Để chạy mã được cung cấp, chúng ta chỉ cần chạy nó dưới dạng tập lệnh Python, nhưng trước tiên chúng ta cần cài đặt các phụ thuộc.
Tạo một tệp mới trong cùng thư mục với dataflow.py và đặt tên là tests.txt.
Thêm nội dung sau vào tệp tests.txt:
bytewax==0.15.1 rerun-sdk==0.4.0
Mở một thiết bị đầu cuối trong thư mục chứa các tệp tests.txt và dataflow.py.
Cài đặt các phụ thuộc bằng lệnh sau:
pip install -r requirements.txt
Và chạy luồng dữ liệu!
python dataflow.py
Mặc dù mã được cung cấp đóng vai trò là một ví dụ cơ bản về phát hiện bất thường theo thời gian thực, nhưng bạn có thể mở rộng quy trình này để phù hợp với các tình huống phức tạp hơn.
Ví dụ:
Kết hợp các nguồn dữ liệu trong thế giới thực : Thay thế lớp RandomMetricInput bằng một lớp tùy chỉnh đọc dữ liệu từ nguồn trong thế giới thực, chẳng hạn như cảm biến IoT, tệp nhật ký hoặc API phát trực tuyến.
Triển khai các kỹ thuật phát hiện bất thường tinh vi hơn : Bạn có thể thay thế lớp ZTestDetector bằng các phương pháp phát hiện bất thường có trạng thái khác, chẳng hạn như trung bình động, làm mịn theo cấp số nhân hoặc các phương pháp tiếp cận dựa trên máy học.
Tùy chỉnh trực quan hóa : Nâng cao trực quan hóa Chạy lại bằng cách thêm nhiều kích thước dữ liệu hơn, điều chỉnh bảng phối màu hoặc sửa đổi kiểu cốt truyện để phù hợp hơn với nhu cầu của bạn.
Tích hợp với các hệ thống cảnh báo và giám sát : Thay vì chỉ in các kết quả bất thường, bạn có thể tích hợp đường ống với các hệ thống cảnh báo hoặc giám sát để thông báo cho các bên liên quan thích hợp khi phát hiện thấy sự bất thường.
Bằng cách tùy chỉnh và mở rộng đường ống luồng dữ liệu, bạn có thể tạo giải pháp trực quan hóa và phát hiện bất thường theo thời gian thực mạnh mẽ phù hợp với trường hợp sử dụng cụ thể của mình. Sự kết hợp giữa Bytewax và Rerun cung cấp một nền tảng linh hoạt và có thể mở rộng để xây dựng các hệ thống trực quan hóa và xử lý dữ liệu thời gian thực.
Bài đăng trên blog này đã trình bày cách sử dụng Bytewax và Chạy lại để tạo trực quan hóa phát hiện bất thường theo thời gian thực. Bằng cách xây dựng một đường dẫn luồng dữ liệu với Bytewax và tích hợp các khả năng trực quan hóa mạnh mẽ của Chạy lại, chúng tôi có thể theo dõi và xác định các điểm bất thường trong dữ liệu của mình khi chúng xảy ra.
Nguyên văn bởi Zander Matheson ở đây.