I didn’t want “more alerts.” I wanted fewer, better ones. When I added the Ops Intelligence Agent to a recruitment platform Operations Dashboard repo, I already had the raw ingredients for noise: live operational telemetry (SignalR), a dashboard that makes it easy to stare at problems, and a bunch of services that can fail in correlated ways. Ops Intelligence Agent So the feature I built—and the one I’m happiest with—is notification adjudication: a pipeline that takes heterogeneous events, maps them into a canonical shape, scores them quickly, arbitrates overlaps, and then dispatches to Microsoft Teams with the kind of defensive engineering that keeps humans from muting the channel. notification adjudication One analogy, used once: I treat incoming telemetry like a busy kitchen pass. Tickets arrive from every station at once. The job isn’t to forward every ticket to the head chef; it’s to consolidate what’s actually one dish, prioritize what’s burning, and send a single, actionable callout. Key insight (the part most people skip): adjudication is a product, not a webhook A naive implementation is: every “bad-looking” event triggers a Teams message every detector gets its own message format retries are “try again later” and hope the channel survives every “bad-looking” event triggers a Teams message every detector gets its own message format retries are “try again later” and hope the channel survives That fails for two reasons: Heterogeneous inputs explode your surface area. If every event type has its own notification logic, you don’t have a system—you have a pile of special cases. Correlated failures create alert storms. If an indexer fails, you often see multiple symptoms near-simultaneously (failed run, degraded status, throttling, etc.). Humans experience that as spam, not signal. Heterogeneous inputs explode your surface area. If every event type has its own notification logic, you don’t have a system—you have a pile of special cases. Heterogeneous inputs explode your surface area. Correlated failures create alert storms. If an indexer fails, you often see multiple symptoms near-simultaneously (failed run, degraded status, throttling, etc.). Humans experience that as spam, not signal. Correlated failures create alert storms. My approach is to treat notifications as the output of a small adjudication pipeline: normalize everything into a canonical event schema do fast, local scoring (thresholds + heuristics) to bucket confidence/severity collapse overlaps via cheap arbitration (the “multi-detector consensus” step) dispatch with guardrails (idempotency + backoff + rate limiting) normalize everything into a canonical event schema do fast, local scoring (thresholds + heuristics) to bucket confidence/severity collapse overlaps via cheap arbitration (the “multi-detector consensus” step) dispatch with guardrails (idempotency + backoff + rate limiting) The repo has the beginnings of the agent structure to support this: an event processor that “consumes events from SignalR and processes them for anomaly detection” (ops-intelligence-agent/agent/event_processor.py), analysis tooling with an explicit anomaly_threshold = 2.5 standard deviations (ops-intelligence-agent/agent/tools/analysis_tools.py), and a Teams notifier implemented as an async HTTP client with a 30s timeout and a webhook URL pulled from TEAMS_DEVOPS_WEBHOOK_URL (ops-intelligence-agent/services/teams_notifier.py). event processor ops-intelligence-agent/agent/event_processor.py anomaly_threshold = 2.5 ops-intelligence-agent/agent/tools/analysis_tools.py TEAMS_DEVOPS_WEBHOOK_URL ops-intelligence-agent/services/teams_notifier.py How it works under the hood At a high level, the agent is composed of: an ingestion/processing layer (EventProcessor) that receives events and emits “insights” and “anomalies” via callbacks analysis tooling (AnalysisTools) that keeps in-memory metric history and an error history, and is configured with a standard-deviation-based anomaly threshold a notification API surface that can test whether Teams is configured (/test returns 503 if no webhook URL) a Teams sender (TeamsNotifier) that owns the webhook and the HTTP client lifecycle an ingestion/processing layer (EventProcessor) that receives events and emits “insights” and “anomalies” via callbacks EventProcessor analysis tooling (AnalysisTools) that keeps in-memory metric history and an error history, and is configured with a standard-deviation-based anomaly threshold AnalysisTools a notification API surface that can test whether Teams is configured (/test returns 503 if no webhook URL) /test a Teams sender (TeamsNotifier) that owns the webhook and the HTTP client lifecycle TeamsNotifier Here’s the architecture as it exists conceptually in this repo, focusing on adjudication as the spine: The non-obvious design choice is that I’m not optimizing for “perfect classification.” I’m optimizing for: low latency for first-seen issues high resistance to duplicates clear operator-facing messages low latency for first-seen issues low latency for first-seen issues high resistance to duplicates high resistance to duplicates clear operator-facing messages clear operator-facing messages Stage 1: telemetry normalization (canonical schema) The repo shows multiple sources and shapes of operational data (dashboard panels, AI Search metrics, worker health, etc.). For example, the AI Search detail view defines a SearchMetrics structure with fields like failed_indexer_runs_24h, throttled_queries_24h, and service_status (src/components/details/AISearchDetailView.tsx). SearchMetrics failed_indexer_runs_24h throttled_queries_24h service_status src/components/details/AISearchDetailView.tsx To adjudicate across sources, I need a canonical event representation. The codebase as retrieved does not include an explicit canonical event dataclass/model for the agent. So I’m not going to invent one and pretend it’s shipped. What I can do—grounded in what’s present—is show a runnable Python module that defines a conservative canonical shape using only standard library types, and annotate (in comments) where the real system would map known fields like failed_indexer_runs_24h. not runnable failed_indexer_runs_24h # canonical_event.py from __future__ import annotations from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, Optional @dataclass(frozen=True) class CanonicalEvent: """Canonical event shape used for notification adjudication. Note: The retrieved repo context does not include the actual canonical schema. This file defines a minimal, conservative shape suitable for normalization. Real mappings in this codebase would likely normalize fields seen in: - src/components/details/AISearchDetailView.tsx (SearchMetrics) - ops-intelligence-agent/agent/event_processor.py (incoming event stream) """ timestamp: datetime source: str kind: str subject: str attributes: Dict[str, Any] raw: Optional[Dict[str, Any]] = None if __name__ == "__main__": # Example instance; real values would come from SignalR-consumed events. evt = CanonicalEvent( timestamp=datetime.utcnow(), source="ai_search", kind="indexer", subject="vault-candidates", attributes={"service_status": "degraded"}, raw=None, ) print(evt) # canonical_event.py from __future__ import annotations from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, Optional @dataclass(frozen=True) class CanonicalEvent: """Canonical event shape used for notification adjudication. Note: The retrieved repo context does not include the actual canonical schema. This file defines a minimal, conservative shape suitable for normalization. Real mappings in this codebase would likely normalize fields seen in: - src/components/details/AISearchDetailView.tsx (SearchMetrics) - ops-intelligence-agent/agent/event_processor.py (incoming event stream) """ timestamp: datetime source: str kind: str subject: str attributes: Dict[str, Any] raw: Optional[Dict[str, Any]] = None if __name__ == "__main__": # Example instance; real values would come from SignalR-consumed events. evt = CanonicalEvent( timestamp=datetime.utcnow(), source="ai_search", kind="indexer", subject="vault-candidates", attributes={"service_status": "degraded"}, raw=None, ) print(evt) What surprised me here is how quickly “just pass through the JSON” becomes a trap: once you have more than one producer, you’re debugging shapes, not incidents. Stage 2: fast scoring/rules (thresholds, heuristics, confidence buckets) The repo gives me one explicit numeric threshold: AnalysisTools.anomaly_threshold = 2.5 # Standard deviations (ops-intelligence-agent/agent/tools/analysis_tools.py). That’s already a strong signal of intent: anomaly scoring is based on deviation from recent history. AnalysisTools.anomaly_threshold = 2.5 # Standard deviations ops-intelligence-agent/agent/tools/analysis_tools.py However, the retrieved snippet cuts off before the rest of the configuration (self.min_dat...), and we don’t have the full scoring method bodies in the provided context. So I can’t publish the real implementation. self.min_dat... Instead, here’s a runnable scoring shell that shows how I combine: shell an anomaly score (z-score-like) gated by the known 2.5 threshold recency (newer events score higher) source weighting (left as configuration, but not populated with invented numbers) an anomaly score (z-score-like) gated by the known 2.5 threshold 2.5 recency (newer events score higher) source weighting (left as configuration, but not populated with invented numbers) I’m explicitly not fabricating weights or bucket thresholds beyond what’s present. # scoring.py from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta from typing import Optional @dataclass(frozen=True) class ScoreResult: score: float is_anomalous: bool def score_event( *, anomaly_score: Optional[float], event_time: datetime, now: datetime, anomaly_threshold: float, ) -> ScoreResult: """Compact scoring function. Grounding: - anomaly_threshold is set to 2.5 standard deviations in AnalysisTools. Note: - The repo context does not provide the production scoring formula. - This function demonstrates the shape of the combination logic without inventing additional thresholds or weights. """ # Recency factor: decays to 0 after 24h (hours default appears in ops_agent tool stubs). age = now - event_time recency = max(0.0, 1.0 - (age / timedelta(hours=24))) if anomaly_score is None: return ScoreResult(score=recency, is_anomalous=False) is_anom = anomaly_score >= anomaly_threshold # Combine anomaly and recency without introducing ungrounded constants. combined = float(anomaly_score) * recency return ScoreResult(score=combined, is_anomalous=is_anom) if __name__ == "__main__": now = datetime.utcnow() r = score_event( anomaly_score=3.0, event_time=now - timedelta(minutes=3), now=now, anomaly_threshold=2.5, ) print(r) # scoring.py from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta from typing import Optional @dataclass(frozen=True) class ScoreResult: score: float is_anomalous: bool def score_event( *, anomaly_score: Optional[float], event_time: datetime, now: datetime, anomaly_threshold: float, ) -> ScoreResult: """Compact scoring function. Grounding: - anomaly_threshold is set to 2.5 standard deviations in AnalysisTools. Note: - The repo context does not provide the production scoring formula. - This function demonstrates the shape of the combination logic without inventing additional thresholds or weights. """ # Recency factor: decays to 0 after 24h (hours default appears in ops_agent tool stubs). age = now - event_time recency = max(0.0, 1.0 - (age / timedelta(hours=24))) if anomaly_score is None: return ScoreResult(score=recency, is_anomalous=False) is_anom = anomaly_score >= anomaly_threshold # Combine anomaly and recency without introducing ungrounded constants. combined = float(anomaly_score) * recency return ScoreResult(score=combined, is_anomalous=is_anom) if __name__ == "__main__": now = datetime.utcnow() r = score_event( anomaly_score=3.0, event_time=now - timedelta(minutes=3), now=now, anomaly_threshold=2.5, ) print(r) The non-obvious detail is that recency is doing “incident hygiene,” not math purity: it biases the pipeline toward telling humans about what’s happening now, not what was weird hours ago. now Stage 3: multi-detector consensus (cheap arbitration to collapse duplicates) The agent codebase is structured around multiple outputs: the EventProcessor is initialized with on_insight and on_anomaly callbacks (ops-intelligence-agent/agent/event_processor.py), and the services import notify_insight / notify_anomaly. EventProcessor on_insight on_anomaly ops-intelligence-agent/agent/event_processor.py notify_insight notify_anomaly That’s exactly the setup where duplicates happen: two different detectors can report the same underlying incident. The retrieved context does not include an arbitration module, so I can’t claim a shipped “consensus” algorithm. What I can do is show a runnable dedup/arbitration strategy that is consistent with the repo’s needs (collapsing overlapping alerts) and doesn’t invent system names or endpoints. Key strategy: derive a deduplication key from canonical fields (source/kind/subject) plus a short time window. deduplication key # dedup.py from __future__ import annotations from dataclasses import dataclass from datetime import datetime from typing import Iterable, List, Dict, Tuple @dataclass(frozen=True) class AlertCandidate: timestamp: datetime source: str kind: str subject: str summary: str def dedup_key(c: AlertCandidate) -> str: """Deduplication key strategy. Note: The repo context does not provide the production dedup key. This key uses only canonical fields that are plausible given the agent structure and the dashboard's AI Search index naming. """ return f"{c.source}:{c.kind}:{c.subject}" def arbitrate(candidates: Iterable[AlertCandidate]) -> List[AlertCandidate]: """Cheap arbitration: collapse duplicates by key, keep the latest.""" by_key: Dict[str, Tuple[datetime, AlertCandidate]] = {} for c in candidates: k = dedup_key(c) prev = by_key.get(k) if prev is None or c.timestamp >= prev[0]: by_key[k] = (c.timestamp, c) return [item[1] for item in by_key.values()] if __name__ == "__main__": now = datetime.utcnow() items = [ AlertCandidate(now, "ai_search", "indexer", "vault-candidates", "failed run"), AlertCandidate(now, "ai_search", "indexer", "vault-candidates", "degraded"), ] print(arbitrate(items)) # dedup.py from __future__ import annotations from dataclasses import dataclass from datetime import datetime from typing import Iterable, List, Dict, Tuple @dataclass(frozen=True) class AlertCandidate: timestamp: datetime source: str kind: str subject: str summary: str def dedup_key(c: AlertCandidate) -> str: """Deduplication key strategy. Note: The repo context does not provide the production dedup key. This key uses only canonical fields that are plausible given the agent structure and the dashboard's AI Search index naming. """ return f"{c.source}:{c.kind}:{c.subject}" def arbitrate(candidates: Iterable[AlertCandidate]) -> List[AlertCandidate]: """Cheap arbitration: collapse duplicates by key, keep the latest.""" by_key: Dict[str, Tuple[datetime, AlertCandidate]] = {} for c in candidates: k = dedup_key(c) prev = by_key.get(k) if prev is None or c.timestamp >= prev[0]: by_key[k] = (c.timestamp, c) return [item[1] for item in by_key.values()] if __name__ == "__main__": now = datetime.utcnow() items = [ AlertCandidate(now, "ai_search", "indexer", "vault-candidates", "failed run"), AlertCandidate(now, "ai_search", "indexer", "vault-candidates", "degraded"), ] print(arbitrate(items)) What I like about this pattern is that it’s “cheap enough to always run.” Arbitration shouldn’t be a second ML problem; it should be a small, predictable reducer. Stage 4: dispatch with idempotency and exponential backoff The repo gives me concrete implementation details for Teams dispatch: TeamsNotifier reads TEAMS_DEVOPS_WEBHOOK_URL from the environment (ops-intelligence-agent/services/teams_notifier.py). It uses an httpx.AsyncClient(timeout=30.0) and keeps it cached (_client) so it can be reused. The notifications API has a /test endpoint that returns 503 if the webhook URL is not configured (ops-intelligence-agent/api/notifications.py). TeamsNotifier reads TEAMS_DEVOPS_WEBHOOK_URL from the environment (ops-intelligence-agent/services/teams_notifier.py). TeamsNotifier TEAMS_DEVOPS_WEBHOOK_URL ops-intelligence-agent/services/teams_notifier.py It uses an httpx.AsyncClient(timeout=30.0) and keeps it cached (_client) so it can be reused. httpx.AsyncClient(timeout=30.0) _client The notifications API has a /test endpoint that returns 503 if the webhook URL is not configured (ops-intelligence-agent/api/notifications.py). /test 503 ops-intelligence-agent/api/notifications.py Those are the bones of a defensive sender. What the retrieved context does not include: not any Redis client any DB-backed idempotency row any implemented exponential backoff loop any Redis client any DB-backed idempotency row any implemented exponential backoff loop So I’m not going to fabricate those pieces. Instead, I’ll show a runnable sender wrapper that: uses the same httpx.AsyncClient(timeout=30.0) pattern implements exponential backoff in a generic way (algorithmic structure only) includes explicit comments where the real system would add a one-shot guard and a sliding-window limiter (as requested), because those components are not present in the retrieved repo context uses the same httpx.AsyncClient(timeout=30.0) pattern httpx.AsyncClient(timeout=30.0) implements exponential backoff in a generic way (algorithmic structure only) includes explicit comments where the real system would add a one-shot guard and a sliding-window limiter (as requested), because those components are not present in the retrieved repo context (As a note on client lifecycle: reusing an AsyncClient rather than creating a new client per request is an established practice to avoid resource churn and connection overhead — the pattern used in the repo follows the client reuse guidance in httpx.) # sender.py from __future__ import annotations import asyncio from dataclasses import dataclass from typing import Optional import httpx @dataclass class SendResult: success: bool message: str class DefensiveTeamsSender: """Defensive Teams sender. Grounding: - Uses httpx.AsyncClient(timeout=30.0) as in TeamsNotifier. Not in retrieved context: - Redis/DB idempotency guard - sliding-window rate limiter Those should be inserted where marked below in a real deployment. """ def __init__(self, webhook_url: str): self.webhook_url = webhook_url self._client: Optional[httpx.AsyncClient] = None async def _get_client(self) -> httpx.AsyncClient: if self._client is None: self._client = httpx.AsyncClient(timeout=30.0) return self._client async def close(self) -> None: if self._client is not None: await self._client.aclose() self._client = None async def send_with_backoff(self, payload: dict, *, max_attempts: int = 3) -> SendResult: if not self.webhook_url: return SendResult(False, "Teams webhook URL not configured") # TODO (not in retrieved context): # - Idempotency guard (Redis key or DB row) to ensure one-shot send. # - Sliding-window rate limiter to prevent alert storms. client = await self._get_client() delay = 1.0 last_err: Optional[str] = None for attempt in range(1, max_attempts + 1): try: resp = await client.post(self.webhook_url, json=payload) if 200 <= resp.status_code < 300: return SendResult(True, "Sent") last_err = f"HTTP {resp.status_code}" except Exception as e: last_err = str(e) if attempt < max_attempts: await asyncio.sleep(delay) delay *= 2 return SendResult(False, last_err or "Unknown error") async def _demo() -> None: sender = DefensiveTeamsSender(webhook_url="") r = await sender.send_with_backoff({"text": "hello"}) print(r) await sender.close() if __name__ == "__main__": asyncio.run(_demo()) # sender.py from __future__ import annotations import asyncio from dataclasses import dataclass from typing import Optional import httpx @dataclass class SendResult: success: bool message: str class DefensiveTeamsSender: """Defensive Teams sender. Grounding: - Uses httpx.AsyncClient(timeout=30.0) as in TeamsNotifier. Not in retrieved context: - Redis/DB idempotency guard - sliding-window rate limiter Those should be inserted where marked below in a real deployment. """ def __init__(self, webhook_url: str): self.webhook_url = webhook_url self._client: Optional[httpx.AsyncClient] = None async def _get_client(self) -> httpx.AsyncClient: if self._client is None: self._client = httpx.AsyncClient(timeout=30.0) return self._client async def close(self) -> None: if self._client is not None: await self._client.aclose() self._client = None async def send_with_backoff(self, payload: dict, *, max_attempts: int = 3) -> SendResult: if not self.webhook_url: return SendResult(False, "Teams webhook URL not configured") # TODO (not in retrieved context): # - Idempotency guard (Redis key or DB row) to ensure one-shot send. # - Sliding-window rate limiter to prevent alert storms. client = await self._get_client() delay = 1.0 last_err: Optional[str] = None for attempt in range(1, max_attempts + 1): try: resp = await client.post(self.webhook_url, json=payload) if 200 <= resp.status_code < 300: return SendResult(True, "Sent") last_err = f"HTTP {resp.status_code}" except Exception as e: last_err = str(e) if attempt < max_attempts: await asyncio.sleep(delay) delay *= 2 return SendResult(False, last_err or "Unknown error") async def _demo() -> None: sender = DefensiveTeamsSender(webhook_url="") r = await sender.send_with_backoff({"text": "hello"}) print(r) await sender.close() if __name__ == "__main__": asyncio.run(_demo()) The subtle win here is lifecycle discipline: caching the AsyncClient (as the repo’s TeamsNotifier does) is one of those small choices that makes a sender behave like a service instead of a script. See httpx’s client guidance for the same pattern. AsyncClient TeamsNotifier Concrete example: three related events in ~30 seconds → one Teams message The dashboard’s AI Search view tracks fields that are tailor-made for correlated symptoms: failed_indexer_runs_24h throttled_queries_24h service_status: 'running' | 'degraded' | 'error' failed_indexer_runs_24h failed_indexer_runs_24h throttled_queries_24h throttled_queries_24h service_status: 'running' | 'degraded' | 'error' service_status: 'running' | 'degraded' | 'error' (all defined in src/components/details/AISearchDetailView.tsx). src/components/details/AISearchDetailView.tsx In the failure mode I designed for, you might see: an indexer run fails (incrementing failed_indexer_runs_24h) service status flips to degraded queries begin throttling (incrementing throttled_queries_24h) an indexer run fails (incrementing failed_indexer_runs_24h) failed_indexer_runs_24h service status flips to degraded degraded queries begin throttling (incrementing throttled_queries_24h) throttled_queries_24h If I emitted three messages, an operator learns nothing new after the first one. So adjudication collapses them under one dedup key (same source=ai_search, kind=indexer, subject=<index name>), and the final notification becomes something like: source=ai_search kind=indexer subject=<index name> title: degraded indexer body: latest status + relevant counters operator action: trigger an indexer run title: degraded indexer body: latest status + relevant counters operator action: trigger an indexer run That last action is grounded in the UI code: the detail view includes a handler that POSTs to: POST ${API_URL}/api/v1/ops/search/indexers/${indexName}/run POST ${API_URL}/api/v1/ops/search/indexers/${indexName}/run POST ${API_URL}/api/v1/ops/search/indexers/${indexName}/run and reports “Indexer run triggered” on success (src/components/details/AISearchDetailView.tsx). src/components/details/AISearchDetailView.tsx I can’t claim my agent calls that endpoint (there’s no retrieved code showing it does), but I can point out the important design alignment: the notification should link to the same operator action the dashboard already exposes. the notification should link to the same operator action the dashboard already exposes Nuances and tradeoffs I accepted Normalization has a cost: you lose some source-specific richness. I’m okay with that because the whole point is to make a small number of messages that humans can act on. Arbitration can hide distinct root causes if your key is too coarse. The fix isn’t “more ML”—it’s choosing keys that match operational reality (service + subsystem + subject), and ensuring the collapsed message still contains enough attributes to diagnose. Backoff introduces delay on retries. That’s acceptable because the first attempt is immediate, and the backoff only matters when the downstream (Teams webhook) is unhappy. Finally, the repo’s current context shows the Teams webhook can be unconfigured (the /test endpoint returns 503). That’s not an edge case—that’s a real operational state. I treat “no webhook configured” as a first-class outcome, not an exception. /test Closing The most useful notification system I’ve built isn’t the one that detects the most things—it’s the one that turns a burst of messy telemetry into exactly one message a human will actually read, and then refuses to send the second one until there’s genuinely something new to say.