A deal gets created, a contact gets created, an account gets created… and then the audit trail tells three different stories depending on whether you’re looking at webhooks, API logs, or what the CRM UI eventually shows. When I first tried to audit our CRM workflow, I assumed the raw event feed would behave like a clean ledger: one event per state change, perfectly ordered, perfectly delivered. That assumption didn’t survive first contact with reality. So I started treating the audit trail like a noisy sensor: it’s reporting on a real underlying process, but it drops readings, repeats readings, and sometimes reports them late. noisy sensor The system I ended up building does four things: Normalize heterogeneous records into one event shape. Sessionize events by deal_id, including a first-step “creation burst” when Contact+Deal+Account are created together. Compress each session into a canonical state path. Extract a transition graph whose edges carry both counts and observed durations. Normalize heterogeneous records into one event shape. Normalize Sessionize events by deal_id, including a first-step “creation burst” when Contact+Deal+Account are created together. Sessionize deal_id Compress each session into a canonical state path. Compress Extract a transition graph whose edges carry both counts and observed durations. Extract counts observed durations The hard part isn’t drawing the graph. It’s deciding what the graph even means when you only partially observe the underlying process. means Key insight: deal_id is the spine deal_id The most stabilizing decision in this audit pipeline is also the simplest: I treat deal_id as the primary key for process reconstruction. deal_id Not because deals are the only entity that matters, but because they’re the one entity that reliably ties the story together across: Event streams from webhooks Event streams from API logs Event streams from webhooks Event streams from webhooks Event streams from API logs Event streams from API logs Everything else—contact IDs, account IDs, UI timestamps—can be missing, duplicated, reordered, or delayed. But if I can anchor events to a deal, I can reconstruct a timeline that’s good enough to reason about. How it works end-to-end (timeline → path → graph) The pipeline has three representations of the same underlying thing: Event timeline (raw, noisy) Compressed state path (canonical labels, sessionized) Transition graph (adjacency matrix with time‑weighted edges) Event timeline (raw, noisy) Event timeline Compressed state path (canonical labels, sessionized) Compressed state path Transition graph (adjacency matrix with time‑weighted edges) Transition graph Exactly one diagram, because one is all you need if it’s the right one. The rest of this post walks through each step with the concrete shapes I use, the wrong turn I took early, and the two ways I handle missing states without lying to myself. Preprocessing: event normalization and canonical state labels Webhooks and API logs don’t share a schema. So the first job is to normalize them into a single event record. The normalized shape I need is driven by what I do later (sessionization and transition extraction). At minimum: deal_id (primary key) timestamp source (webhook vs API log vs snapshot) entity_type (deal/contact/account) event_type (created/updated/etc.) canonical_state (derived label) deal_id (primary key) deal_id timestamp timestamp source (webhook vs API log vs snapshot) source entity_type (deal/contact/account) entity_type event_type (created/updated/etc.) event_type canonical_state (derived label) canonical_state In my system, the raw payload formats and the canonical label mapping rules are environment-specific, and I’m not going to paste them here. What I can do is make the contract explicit, and make it impossible to accidentally treat a stub as a working normalizer. can from __future__ import annotations from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, Optional @dataclass(frozen=True) class NormalizedEvent: deal_id: str timestamp: datetime source: str # e.g., "webhook", "api_log", "snapshot" entity_type: str # e.g., "deal", "contact", "account" event_type: str # e.g., "created", "updated" canonical_state: str raw: Dict[str, Any] def normalize_event(raw: Dict[str, Any], *, source: str) -> Optional[NormalizedEvent]: """Normalize a raw webhook/log/snapshot record into a common event shape. This function is intentionally an interface example. Why: the exact field extraction rules depend on your CRM payload schemas and your canonical state vocabulary. Safety: raising here prevents a silent 'return None' footgun that can corrupt downstream counts. """ raise NotImplementedError( "Implement payload parsing + canonical state mapping for your sources." ) from __future__ import annotations from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, Optional @dataclass(frozen=True) class NormalizedEvent: deal_id: str timestamp: datetime source: str # e.g., "webhook", "api_log", "snapshot" entity_type: str # e.g., "deal", "contact", "account" event_type: str # e.g., "created", "updated" canonical_state: str raw: Dict[str, Any] def normalize_event(raw: Dict[str, Any], *, source: str) -> Optional[NormalizedEvent]: """Normalize a raw webhook/log/snapshot record into a common event shape. This function is intentionally an interface example. Why: the exact field extraction rules depend on your CRM payload schemas and your canonical state vocabulary. Safety: raising here prevents a silent 'return None' footgun that can corrupt downstream counts. """ raise NotImplementedError( "Implement payload parsing + canonical state mapping for your sources." ) Canonical state labels aren’t just a prettier name for a stage. They’re a projection of heterogeneous events into a state-machine vocabulary you control. Canonical state labels projection If you don’t control the vocabulary, you can’t compare runs. Sessionization: grouping events into process runs A single deal_id can have multiple “runs” worth of activity depending on how your business operates (re-opened deals, retries, manual edits). That’s why sessionization exists. deal_id Sessionization does two jobs: Split a long event list into contiguous sessions based on time gaps. Merge closely related entity creation events (Contact+Deal+Account) into a single “creation burst” so the initial state path isn’t fragmented. Split a long event list into contiguous sessions based on time gaps. Merge closely related entity creation events (Contact+Deal+Account) into a single “creation burst” so the initial state path isn’t fragmented. Unlike normalization, sessionization can be demonstrated generically because it only depends on timestamps and a couple of tunable time windows. can from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta from typing import Dict, Iterable, List, Sequence # NormalizedEvent is defined in the normalization step above. @dataclass(frozen=True) class Session: deal_id: str events: List[NormalizedEvent] def sessionize( events: Iterable[NormalizedEvent], *, gap: timedelta, creation_burst_window: timedelta, ) -> List[Session]: """Group normalized events into sessions per deal_id. Rules: - Events are sorted by timestamp. - A new session starts when the gap between consecutive events exceeds `gap`. - A 'creation burst' is a short window at the start of a session; if multiple creation events across entity types land in that window, they are kept together as the single start of the session (not split into separate sessions). Note: creation-burst handling here is deliberately conservative: it does not rewrite states; it only prevents the session boundary logic from fragmenting the initial bundle. """ by_deal: Dict[str, List[NormalizedEvent]] = {} for e in events: by_deal.setdefault(e.deal_id, []).append(e) out: List[Session] = [] for deal_id, evs in by_deal.items(): evs_sorted = sorted(evs, key=lambda x: x.timestamp) if not evs_sorted: continue current: List[NormalizedEvent] = [evs_sorted[0]] session_start = evs_sorted[0].timestamp for e in evs_sorted[1:]: prev = current[-1] dt = e.timestamp - prev.timestamp # If we're still inside the initial creation-burst window, never split. in_creation_burst = (prev.timestamp - session_start) <= creation_burst_window if (dt > gap) and (not in_creation_burst): out.append(Session(deal_id=deal_id, events=current)) current = [e] session_start = e.timestamp else: current.append(e) out.append(Session(deal_id=deal_id, events=current)) return out from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta from typing import Dict, Iterable, List, Sequence # NormalizedEvent is defined in the normalization step above. @dataclass(frozen=True) class Session: deal_id: str events: List[NormalizedEvent] def sessionize( events: Iterable[NormalizedEvent], *, gap: timedelta, creation_burst_window: timedelta, ) -> List[Session]: """Group normalized events into sessions per deal_id. Rules: - Events are sorted by timestamp. - A new session starts when the gap between consecutive events exceeds `gap`. - A 'creation burst' is a short window at the start of a session; if multiple creation events across entity types land in that window, they are kept together as the single start of the session (not split into separate sessions). Note: creation-burst handling here is deliberately conservative: it does not rewrite states; it only prevents the session boundary logic from fragmenting the initial bundle. """ by_deal: Dict[str, List[NormalizedEvent]] = {} for e in events: by_deal.setdefault(e.deal_id, []).append(e) out: List[Session] = [] for deal_id, evs in by_deal.items(): evs_sorted = sorted(evs, key=lambda x: x.timestamp) if not evs_sorted: continue current: List[NormalizedEvent] = [evs_sorted[0]] session_start = evs_sorted[0].timestamp for e in evs_sorted[1:]: prev = current[-1] dt = e.timestamp - prev.timestamp # If we're still inside the initial creation-burst window, never split. in_creation_burst = (prev.timestamp - session_start) <= creation_burst_window if (dt > gap) and (not in_creation_burst): out.append(Session(deal_id=deal_id, events=current)) current = [e] session_start = e.timestamp else: current.append(e) out.append(Session(deal_id=deal_id, events=current)) return out Why this matters: transition extraction assumes you have a coherent sequence. If you let a creation bundle appear as three independent starts, your graph will over-count early-stage transitions and under-count the “true” first state. Transition extraction: adjacency with time‑weighted edges Once I have a session, I compress it into a state path: remove redundant repeats (same canonical state repeated by multiple updates) keep the first timestamp for each state occurrence remove redundant repeats (same canonical state repeated by multiple updates) keep the first timestamp for each state occurrence Then I extract transitions: (state_i -> state_{i+1}). (state_i -> state_{i+1}) “Time‑weighted edges” means each edge carries both: a count (how often did this transition occur) a list of observed durations (how long did it take) a count (how often did this transition occur) a list of observed durations (how long did it take) I store durations rather than pre-aggregating them, because averages can hide multi-modal behavior. from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta from typing import Dict, List, Tuple State = str Edge = Tuple[State, State] @dataclass class EdgeStats: count: int durations: List[timedelta] def build_adjacency_matrix( state_path: List[Tuple[State, datetime]], ) -> Dict[Edge, EdgeStats]: """Build an adjacency map from a timestamped state path. Input: [(state, entered_at), ...] in chronological order. Output: {(from_state, to_state): EdgeStats(count, durations)} The 'time-weight' stored here is the raw observed duration per transition. """ adjacency: Dict[Edge, EdgeStats] = {} for (s1, t1), (s2, t2) in zip(state_path, state_path[1:]): edge = (s1, s2) dt = t2 - t1 if edge not in adjacency: adjacency[edge] = EdgeStats(count=0, durations=[]) adjacency[edge].count += 1 adjacency[edge].durations.append(dt) return adjacency from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta from typing import Dict, List, Tuple State = str Edge = Tuple[State, State] @dataclass class EdgeStats: count: int durations: List[timedelta] def build_adjacency_matrix( state_path: List[Tuple[State, datetime]], ) -> Dict[Edge, EdgeStats]: """Build an adjacency map from a timestamped state path. Input: [(state, entered_at), ...] in chronological order. Output: {(from_state, to_state): EdgeStats(count, durations)} The 'time-weight' stored here is the raw observed duration per transition. """ adjacency: Dict[Edge, EdgeStats] = {} for (s1, t1), (s2, t2) in zip(state_path, state_path[1:]): edge = (s1, s2) dt = t2 - t1 if edge not in adjacency: adjacency[edge] = EdgeStats(count=0, durations=[]) adjacency[edge].count += 1 adjacency[edge].durations.append(dt) return adjacency Partial observability: why naive counts lie Webhooks don’t always arrive. API logs don’t always include everything. That means you can observe: A -> C in your timeline A -> C in your timeline A -> C …even though the real process was: A -> B -> C A -> B -> C A -> B -> C If you naively count A -> C, you will: A -> C inflate shortcut edges deflate the missing state’s incoming/outgoing edges misidentify bottlenecks (because time spent in B disappears) inflate shortcut edges deflate the missing state’s incoming/outgoing edges misidentify bottlenecks (because time spent in B disappears) B One mitigation I use is to augment event streams with periodic database snapshots of current entity state. When a snapshot implies a state that never appeared in the event stream, I insert an imputed event tagged with its source so it can’t be confused with an observed transition. database snapshots imputed That doesn’t magically give you truth. It reduces a systematic bias: “missing delivery looks like a shortcut.” EM over missing states (conceptual only) Below is conceptual pseudocode—not runnable code. A real EM implementation needs your concrete state space, constraints, and safety checks (convergence criteria, caps on path explosion, and validation that probabilities stay well-formed). conceptual pseudocode PSEUDO-CODE — for conceptual illustration only (DO NOT RUN) Goal: Estimate transition probabilities while accounting for possibly-missing intermediate states. initialize T from observed adjacent transitions (or uniformly) repeat until convergence: expected_counts = 0 for each observed path: for each adjacent observed pair (A, C): if missing intermediates are possible between A and C: infer a distribution over candidate hidden sequences A -> ... -> C using current T add fractional expected counts along each candidate sequence else: expected_counts[A,C] += 1 normalize each row of expected_counts into T return T PSEUDO-CODE — for conceptual illustration only (DO NOT RUN) Goal: Estimate transition probabilities while accounting for possibly-missing intermediate states. initialize T from observed adjacent transitions (or uniformly) repeat until convergence: expected_counts = 0 for each observed path: for each adjacent observed pair (A, C): if missing intermediates are possible between A and C: infer a distribution over candidate hidden sequences A -> ... -> C using current T add fractional expected counts along each candidate sequence else: expected_counts[A,C] += 1 normalize each row of expected_counts into T return T The point of EM here isn’t magic—it’s honesty. Instead of declaring “it was A→C,” you spread probability mass across plausible intermediates. Entropy-aware tie-breaker for ambiguous transitions Sometimes I don’t want the complexity of EM, or I need a deterministic path for visualization. In that case: If multiple next states are plausible, I pick the one with the lowest uncertainty only if that uncertainty is below a configured tolerance. Otherwise, I split the transition into multiple edges with fractional counts. If multiple next states are plausible, I pick the one with the lowest uncertainty only if that uncertainty is below a configured tolerance. only if Otherwise, I split the transition into multiple edges with fractional counts. That second branch is the whole point: preserve ambiguity instead of forcing a single edge. Analytic outputs: heatmaps, CDFs, and shortcut detection Once you have an adjacency matrix with per-edge durations, three outputs are straightforward. Bottleneck heatmaps Heatmaps are a visualization layer over the adjacency matrix. X axis: from_state Y axis: to_state cell value: count, median duration, or another aggregation X axis: from_state Y axis: to_state cell value: count, median duration, or another aggregation Mean-time-to-transition CDFs Averages are brittle. CDFs tell you what fraction of transitions complete within a given time. Given EdgeStats.durations, you can compute an empirical CDF per edge and compare edges to find “slow tails” that don’t show up in mean/median. EdgeStats.durations Anomalous shortcut detection A “shortcut” is an edge that appears in the graph but violates the expected ordering implied by your canonical state model. Method: define allowed transitions (your expected adjacency) flag observed edges outside that set rank by count and/or time impact define allowed transitions (your expected adjacency) flag observed edges outside that set rank by count and/or time impact What went wrong first: I trusted arrival order My first implementation treated the event stream as inherently ordered: whatever arrived first must have happened first. That was the wrong architecture. Once I mixed sources (webhooks + logs), arrival order became a meaningless artifact of network timing. The fix was to make timestamp ordering a first-class requirement: parse timestamps sort within deal_id only then sessionize parse timestamps sort within deal_id only then sessionize The symptom that finally forced the change was simple: I kept chasing “phantom shortcuts” that disappeared the moment I stopped trusting arrival order. Closing Once I anchored process reconstruction on deal_id, normalized the vocabulary, sessionized creation bursts, and let edges carry both counts and observed durations, the audit trail stopped being a pile of contradictory stories and started behaving like a model I could interrogate—without pretending the missing pieces weren’t missing. deal_id