Hi, my name is Andrew! I oversee a large-scale online service where our queues (Kafka or similar) process millions of events each day. We have to spot anomalies quickly, track key metrics, and notify the team about critical issues—all in real time. Because data streams in from various systems, it’s never as simple as using one language or tool. Some engineers favor Go for performance and concurrency, others prefer Python for machine learning, and Node.js often wins for handling external integrations (Slack, webhooks, etc.). On top of that, our DevOps folks rely heavily on classic CLI tools like tar and gzip for archiving logs. tar gzip At first, everything was scattered across different scripts, containers, and services. It worked, but it got messy—too many handoffs, too many config files, and a constant risk of something breaking when one team decided to update their part of the puzzle. So we decided to unify the entire process in one computational graph. The solution we came up with is a local-first FaaS approach where each “node” can be written in whatever language or tool we want, but everything is orchestrated under a single workflow. Below is how we designed each node in our real-time pipeline, why we chose each language or tool, and how data flows between them via simple JSON adapters. one Node 1 (Rust): High-Performance Ingestion Why Rust? We needed maximum throughput and memory safety. Our ingestion service handles hundreds of thousands of events per minute from sources like IoT devices and microservices in our cluster. Rust’s ownership model helps avoid memory leaks and race conditions, which is crucial when your service runs 24/7. Example Rust code (node_rust_ingest.rs): Example Rust code ( node_rust_ingest.rs use std::fs; use serde::{Deserialize, Serialize}; #[derive(Deserialize)] struct IngestConfig { pub source_path: String, } fn main() -> Result<(), Box<dyn std::error::Error>> { // Read config from JSON let config_data = fs::read_to_string("config.json")?; let config: IngestConfig = serde_json::from_str(&config_data)?; // Imagine reading from a high-volume stream here: let raw_events = vec![ "temperature:24.5 sensor_id=alpha", "temperature:25.1 sensor_id=beta", ]; // Convert the raw events into JSON and save let json_str = serde_json::to_string_pretty(&raw_events)?; fs::write("raw_events.json", json_str)?; Ok(()) } use std::fs; use serde::{Deserialize, Serialize}; #[derive(Deserialize)] struct IngestConfig { pub source_path: String, } fn main() -> Result<(), Box<dyn std::error::Error>> { // Read config from JSON let config_data = fs::read_to_string("config.json")?; let config: IngestConfig = serde_json::from_str(&config_data)?; // Imagine reading from a high-volume stream here: let raw_events = vec![ "temperature:24.5 sensor_id=alpha", "temperature:25.1 sensor_id=beta", ]; // Convert the raw events into JSON and save let json_str = serde_json::to_string_pretty(&raw_events)?; fs::write("raw_events.json", json_str)?; Ok(()) } Input: A JSON file named config.json (handled by an adapter, e.g., adapter_input_json("config.json")). Output: raw_events.json with the raw events. Input: A JSON file named config.json (handled by an adapter, e.g., adapter_input_json("config.json")). Input config.json adapter_input_json("config.json") Output: raw_events.json with the raw events. Output raw_events.json In practice, this Rust code might pull real messages from Kafka or a custom TCP socket, but the idea is the same: read something from a stream, then write it out as JSON. Node 2 (Go): Filtering and Normalization Why Go? It’s great for concurrency (goroutines and channels) and compiles into a single binary we can deploy anywhere. This node focuses on quick filtering of raw data, dropping empty or invalid records, and transforming some fields before sending them off for deeper analysis. Example Go code (node_go_filter.go): Example Go code ( node_go_filter.go package main import ( "encoding/json" "fmt" "io/ioutil" "os" "strings" ) type FilteredEvent struct { Valid bool `json:\"valid\"` Data string `json:\"data\"` } func main() { // Read raw events JSON from file (adapter does the path resolution) rawData, err := ioutil.ReadFile(\"raw_events.json\") if err != nil { fmt.Println(\"Error reading file:\", err) os.Exit(1) } var events []string if err := json.Unmarshal(rawData, &events); err != nil { fmt.Println(\"Error unmarshalling JSON:\", err) os.Exit(1) } // Filter out any malformed or empty events var filtered []FilteredEvent for _, e := range events { if strings.TrimSpace(e) == \"\" { continue } filtered = append(filtered, FilteredEvent{ Valid: true, Data: e, }) } // Write filtered events out to JSON output, err := json.MarshalIndent(filtered, \"\", \" \") if err != nil { fmt.Println(\"Error marshalling JSON:\", err) os.Exit(1) } if err := ioutil.WriteFile(\"filtered_events.json\", output, 0644); err != nil { fmt.Println(\"Error writing file:\", err) os.Exit(1) } } package main import ( "encoding/json" "fmt" "io/ioutil" "os" "strings" ) type FilteredEvent struct { Valid bool `json:\"valid\"` Data string `json:\"data\"` } func main() { // Read raw events JSON from file (adapter does the path resolution) rawData, err := ioutil.ReadFile(\"raw_events.json\") if err != nil { fmt.Println(\"Error reading file:\", err) os.Exit(1) } var events []string if err := json.Unmarshal(rawData, &events); err != nil { fmt.Println(\"Error unmarshalling JSON:\", err) os.Exit(1) } // Filter out any malformed or empty events var filtered []FilteredEvent for _, e := range events { if strings.TrimSpace(e) == \"\" { continue } filtered = append(filtered, FilteredEvent{ Valid: true, Data: e, }) } // Write filtered events out to JSON output, err := json.MarshalIndent(filtered, \"\", \" \") if err != nil { fmt.Println(\"Error marshalling JSON:\", err) os.Exit(1) } if err := ioutil.WriteFile(\"filtered_events.json\", output, 0644); err != nil { fmt.Println(\"Error writing file:\", err) os.Exit(1) } } Input: raw_events.json. Output: filtered_events.json with an array of cleaned-up records. Input: raw_events.json. Input raw_events.json Output: filtered_events.json with an array of cleaned-up records. Output filtered_events.json Node 3 (Python): ML Classification Why Python? Most data science and ML teams rely on Python’s ecosystem (NumPy, scikit-learn, PyTorch, etc.). It’s easy to iterate quickly and load pretrained models. Example Python code (node_python_ml.py): Example Python code ( node_python_ml.py import json def main(): # Load filtered events from JSON with open('filtered_events.json', 'r') as f: filtered_data = json.load(f) # Imagine we have a pre-trained model loaded here # For illustration, let's just mock predictions predicted_events = [] for item in filtered_data: data_str = item['data'] # Some ML logic here... prediction = 'normal' if 'sensor_id=beta' in data_str: prediction = 'warning' # as an example predicted_events.append({ 'data': data_str, 'prediction': prediction }) # Save predictions to JSON with open('predicted_events.json', 'w') as f: json.dump(predicted_events, f, indent=2) if __name__ == '__main__': main() import json def main(): # Load filtered events from JSON with open('filtered_events.json', 'r') as f: filtered_data = json.load(f) # Imagine we have a pre-trained model loaded here # For illustration, let's just mock predictions predicted_events = [] for item in filtered_data: data_str = item['data'] # Some ML logic here... prediction = 'normal' if 'sensor_id=beta' in data_str: prediction = 'warning' # as an example predicted_events.append({ 'data': data_str, 'prediction': prediction }) # Save predictions to JSON with open('predicted_events.json', 'w') as f: json.dump(predicted_events, f, indent=2) if __name__ == '__main__': main() Input: filtered_events.json. Output: predicted_events.json with a prediction field for each record. Input: filtered_events.json. Input filtered_events.json Output: predicted_events.json with a prediction field for each record. Output predicted_events.json prediction In a real scenario, you might load a Torch or TensorFlow model and do actual inference. The concept stays the same: read JSON, process, write JSON. Node 4 (Node.js): External Service Notifications Why Node.js? We use it for real-time hooks (Slack notifications, webhooks, etc.), thanks to its event-driven nature and wide range of packages on npm. Example JavaScript code (node_js_notify.js): Example JavaScript code ( node_js_notify.js const fs = require('fs'); const axios = require('axios'); // For sending data to a webhook, Slack, etc. async function main() { // Read the predicted events JSON const rawData = fs.readFileSync('predicted_events.json', 'utf8'); const predictedEvents = JSON.parse(rawData); // For each event, if it's a warning, send an alert for (const event of predictedEvents) { if (event.prediction === 'warning') { await sendAlert(event.data); } } } async function sendAlert(data) { // Example: post to Slack (replace with your actual webhook) const slackWebhook = 'https://hooks.slack.com/services/your/webhook/url'; await axios.post(slackWebhook, { text: `Alert! Check sensor data: ${data}` }); console.log('Alert sent to Slack for:', data); } main().catch(err => { console.error('Error sending alerts:', err); process.exit(1); }); const fs = require('fs'); const axios = require('axios'); // For sending data to a webhook, Slack, etc. async function main() { // Read the predicted events JSON const rawData = fs.readFileSync('predicted_events.json', 'utf8'); const predictedEvents = JSON.parse(rawData); // For each event, if it's a warning, send an alert for (const event of predictedEvents) { if (event.prediction === 'warning') { await sendAlert(event.data); } } } async function sendAlert(data) { // Example: post to Slack (replace with your actual webhook) const slackWebhook = 'https://hooks.slack.com/services/your/webhook/url'; await axios.post(slackWebhook, { text: `Alert! Check sensor data: ${data}` }); console.log('Alert sent to Slack for:', data); } main().catch(err => { console.error('Error sending alerts:', err); process.exit(1); }); Input: predicted_events.json. Output: Not necessarily a JSON output (though you could log to a file). Typically, it just triggers external actions. Input: predicted_events.json. Input predicted_events.json Output: Not necessarily a JSON output (though you could log to a file). Typically, it just triggers external actions. Output Node 5 (Shell Script): Archiving Logs Why Shell/CLI? Classic tools like tar and gzip are reliable workhorses. There’s no need to rewrite them in some other language. They’ve been proven in production for decades. tar gzip Example Shell script (node_shell_archive.sh): Example Shell script ( node_shell_archive.sh #!/usr/bin/env bash # We assume raw_events.json, filtered_events.json, predicted_events.json # are present in the current directory. We'll archive them together. ARCHIVE_NAME="events_archive.tar.gz" tar -czf "$ARCHIVE_NAME" raw_events.json filtered_events.json predicted_events.json echo "Archive created: $ARCHIVE_NAME" #!/usr/bin/env bash # We assume raw_events.json, filtered_events.json, predicted_events.json # are present in the current directory. We'll archive them together. ARCHIVE_NAME="events_archive.tar.gz" tar -czf "$ARCHIVE_NAME" raw_events.json filtered_events.json predicted_events.json echo "Archive created: $ARCHIVE_NAME" Input: The JSON files we produced in the previous steps. Output: events_archive.tar.gz, ready to be stored or uploaded to a long-term storage solution. Input: The JSON files we produced in the previous steps. Input Output: events_archive.tar.gz, ready to be stored or uploaded to a long-term storage solution. Output events_archive.tar.gz The Role of Data Adapters In our setup, each node declares what type of data it expects to read and what it produces. But instead of tying every node to a single format like JSON, we rely on adapters to handle the actual loading and saving of data in the target language. An adapter is just a small function or library call that knows how to read or write a specific data type (files, streams, binary formats, you name it) within a particular runtime. adapters Example: adapter_input_json("filtered_events.json") might parse a JSON file into native objects in Python or Go. adapter_output_csv("results.csv") could serialize an array of events into a CSV string and save it. Example: adapter_input_json("filtered_events.json") might parse a JSON file into native objects in Python or Go. adapter_output_csv("results.csv") could serialize an array of events into a CSV string and save it. Example: adapter_input_json("filtered_events.json") might parse a JSON file into native objects in Python or Go. adapter_output_csv("results.csv") could serialize an array of events into a CSV string and save it. adapter_input_json("filtered_events.json") might parse a JSON file into native objects in Python or Go. adapter_input_json("filtered_events.json") adapter_output_csv("results.csv") could serialize an array of events into a CSV string and save it. adapter_output_csv("results.csv") The beauty of this approach is that each node can seamlessly work with different data representations, as long as there’s an adapter for them. Whether you prefer JSON, CSV, binary blobs, or even specialized formats (like Parquet or Avro), adapters keep the underlying implementation details out of the node’s logic. That way, Rust stays focused on ingestion performance, Python remains laser-focused on ML tasks, and Node.js is free to handle whatever output format suits your external APIs—without forcing the entire pipeline to stick to one rigid standard. Rust stays focused on ingestion performance, Python remains laser-focused on ML tasks, and Node.js is free to handle whatever output format suits your external APIs—without forcing the entire pipeline to stick to one rigid standard. Final Thoughts By unifying everything into a single computational graph and adopting a local FaaS mindset, we drastically reduced the friction between teams. The Rust folks can keep optimizing throughput without breaking anything downstream, Go developers can refine their filtering logic, Python ML engineers can retrain or tweak models, and our Node.js layer can easily tie into Slack. Even our DevOps guys are happy to keep using battle-tested CLI tools for archives. Before we set this up, we spent way too much time managing glue code and random integration points. Now, each team can just focus on their own node, with minimal dependencies on everyone else. If you’re dealing with a multi-language environment—or foresee that one tool won’t suffice for your entire pipeline—this sort of local-first FaaS approach might be exactly what you need.